Skip to content

Commit

Permalink
Merge pull request #282 from jschlyter/update_token_refresh
Browse files Browse the repository at this point in the history
Rework token acquire/refresh logic
  • Loading branch information
jschlyter authored Jan 5, 2025
2 parents ca53c5f + edef33d commit f633d20
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 45 deletions.
126 changes: 82 additions & 44 deletions custom_components/polestar_api/pypolestar/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,57 +101,44 @@ def is_token_valid(self) -> bool:
and self.token_expiry > datetime.now(tz=timezone.utc)
)

async def get_token(self, refresh=False) -> None:
"""Get the token from Polestar."""
async def get_token(self, force: bool = False) -> None:
"""Ensure we have a valid access token (still valid, refreshed or initial)."""

if (
not refresh
or self.token_expiry is None
or self.token_expiry < datetime.now(tz=timezone.utc)
not force
and self.access_token is not None
and self.token_expiry
and self.token_expiry > datetime.now(tz=timezone.utc)
):
if (code := await self._get_code()) is None:
return

token_request = {
"grant_type": "authorization_code",
"client_id": OIDC_CLIENT_ID,
"code": code,
"redirect_uri": OIDC_REDIRECT_URI,
**(
{
"code_verifier": self.oidc_code_verifier,
}
if self.oidc_code_verifier
else {}
),
}

elif self.refresh_token:
token_request = {
"grant_type": "refresh_token",
"client_id": OIDC_CLIENT_ID,
"refresh_token": self.refresh_token,
}
else:
self.logger.debug("Token still valid until %s", self.token_expiry)
return

self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
)
if self.refresh_token:
try:
await self._token_refresh()
self.logger.debug("Token refreshed")
except Exception as exc:
self.logger.warning(
"Failed to refresh token, retry with code", exc_info=exc
)

try:
result = await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)
await self._authorization_code()
self.logger.debug("Initial token acquired")
return
except Exception as exc:
self.latest_call_code = None
self.logger.error("Auth Token Error: %s", str(exc))
raise PolestarAuthException("Error getting token") from exc
raise PolestarAuthException("Unable to acquire initial token") from exc

payload = result.json()
self.latest_call_code = result.status_code
def _parse_token_response(self, response: httpx.Response) -> None:
"""Parse response from token endpoint and update token state."""

self.latest_call_code = response.status_code

payload = response.json()

if "error" in payload:
self.logger.error("Token error: %s", payload)
raise PolestarAuthException("Token error", response.status_code)

try:
self.access_token = payload["access_token"]
Expand All @@ -161,11 +148,62 @@ async def get_token(self, refresh=False) -> None:
seconds=self.token_lifetime
)
except KeyError as exc:
self.logger.error("Token response missing expected keys: %s", exc)
raise PolestarAuthException("Incomplete token response") from exc
self.logger.error("Token response missing key: %s", exc)
raise PolestarAuthException("Token response missing key") from exc

self.logger.debug("Access token updated, valid until %s", self.token_expiry)

async def _authorization_code(self) -> None:
"""Get initial token via authorization code."""

if (code := await self._get_code()) is None:
raise PolestarAuthException("Unable to get code")

token_request = {
"grant_type": "authorization_code",
"client_id": OIDC_CLIENT_ID,
"code": code,
"redirect_uri": OIDC_REDIRECT_URI,
**(
{"code_verifier": self.oidc_code_verifier}
if self.oidc_code_verifier
else {}
),
}

self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
)

response = await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)
response.raise_for_status()
self._parse_token_response(response)

async def _token_refresh(self) -> None:
"""Refresh existing token."""

token_request = {
"grant_type": "refresh_token",
"client_id": OIDC_CLIENT_ID,
"refresh_token": self.refresh_token,
}

self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
)

response = await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)
response.raise_for_status()
self._parse_token_response(response)

async def _get_code(self) -> str | None:
query_params = await self._get_resume_path()

Expand Down
2 changes: 1 addition & 1 deletion custom_components/polestar_api/pypolestar/polestar.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def get_ev_data(self, vin: str) -> None:

try:
if self.auth.need_token_refresh():
await self.auth.get_token(refresh=True)
await self.auth.get_token()
except PolestarAuthException as e:
self.latest_call_code = 500
self.logger.warning("Auth Exception: %s", str(e))
Expand Down

0 comments on commit f633d20

Please sign in to comment.