Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework token acquire/refresh logic #282

Merged
merged 4 commits into from
Jan 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 82 additions & 44 deletions custom_components/polestar_api/pypolestar/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,57 +101,44 @@ def is_token_valid(self) -> bool:
and self.token_expiry > datetime.now(tz=timezone.utc)
)

async def get_token(self, refresh=False) -> None:
"""Get the token from Polestar."""
async def get_token(self, force: bool = False) -> None:
"""Ensure we have a valid access token (still valid, refreshed or initial)."""

if (
not refresh
or self.token_expiry is None
or self.token_expiry < datetime.now(tz=timezone.utc)
not force
and self.access_token is not None
and self.token_expiry
and self.token_expiry > datetime.now(tz=timezone.utc)
):
if (code := await self._get_code()) is None:
return

token_request = {
"grant_type": "authorization_code",
"client_id": OIDC_CLIENT_ID,
"code": code,
"redirect_uri": OIDC_REDIRECT_URI,
**(
{
"code_verifier": self.oidc_code_verifier,
}
if self.oidc_code_verifier
else {}
),
}

elif self.refresh_token:
token_request = {
"grant_type": "refresh_token",
"client_id": OIDC_CLIENT_ID,
"refresh_token": self.refresh_token,
}
else:
self.logger.debug("Token still valid until %s", self.token_expiry)
return

self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
)
if self.refresh_token:
try:
await self._token_refresh()
self.logger.debug("Token refreshed")
except Exception as exc:
self.logger.warning(
"Failed to refresh token, retry with code", exc_info=exc
)

try:
result = await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)
await self._authorization_code()
self.logger.debug("Initial token acquired")
return
except Exception as exc:
self.latest_call_code = None
self.logger.error("Auth Token Error: %s", str(exc))
raise PolestarAuthException("Error getting token") from exc
raise PolestarAuthException("Unable to acquire initial token") from exc

payload = result.json()
self.latest_call_code = result.status_code
def _parse_token_response(self, response: httpx.Response) -> None:
"""Parse response from token endpoint and update token state."""

self.latest_call_code = response.status_code

payload = response.json()

if "error" in payload:
self.logger.error("Token error: %s", payload)
raise PolestarAuthException("Token error", response.status_code)

try:
self.access_token = payload["access_token"]
Expand All @@ -161,11 +148,62 @@ async def get_token(self, refresh=False) -> None:
seconds=self.token_lifetime
)
except KeyError as exc:
self.logger.error("Token response missing expected keys: %s", exc)
raise PolestarAuthException("Incomplete token response") from exc
self.logger.error("Token response missing key: %s", exc)
raise PolestarAuthException("Token response missing key") from exc

self.logger.debug("Access token updated, valid until %s", self.token_expiry)

async def _authorization_code(self) -> None:
"""Get initial token via authorization code."""

if (code := await self._get_code()) is None:
raise PolestarAuthException("Unable to get code")

token_request = {
"grant_type": "authorization_code",
"client_id": OIDC_CLIENT_ID,
"code": code,
"redirect_uri": OIDC_REDIRECT_URI,
**(
{"code_verifier": self.oidc_code_verifier}
if self.oidc_code_verifier
else {}
),
}

self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
)

response = await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)
response.raise_for_status()
self._parse_token_response(response)

async def _token_refresh(self) -> None:
"""Refresh existing token."""

token_request = {
"grant_type": "refresh_token",
"client_id": OIDC_CLIENT_ID,
"refresh_token": self.refresh_token,
}

self.logger.debug(
"Call token endpoint with grant_type=%s", token_request["grant_type"]
jschlyter marked this conversation as resolved.
Show resolved Hide resolved
)

response = await self.client_session.post(
self.oidc_configuration["token_endpoint"],
data=token_request,
timeout=HTTPX_TIMEOUT,
)
response.raise_for_status()
self._parse_token_response(response)

async def _get_code(self) -> str | None:
query_params = await self._get_resume_path()

Expand Down
2 changes: 1 addition & 1 deletion custom_components/polestar_api/pypolestar/polestar.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ async def get_ev_data(self, vin: str) -> None:

try:
if self.auth.need_token_refresh():
await self.auth.get_token(refresh=True)
await self.auth.get_token()
except PolestarAuthException as e:
self.latest_call_code = 500
self.logger.warning("Auth Exception: %s", str(e))
Expand Down
Loading