Skip to content

Commit

Permalink
Replace flake8 and isort with ruff lint (#563)
Browse files Browse the repository at this point in the history
* Replace flake8 and isort with ruff lint

* Fix tests

* Automatic fixes

* Remaining fixes
  • Loading branch information
KapJI authored Sep 18, 2024
1 parent ce652f9 commit ecdac47
Show file tree
Hide file tree
Showing 22 changed files with 364 additions and 410 deletions.
3 changes: 1 addition & 2 deletions .github/actions/install-deps/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ runs:
- name: Install dependencies with uv
shell: bash
run: |
uv sync
run: uv sync
11 changes: 3 additions & 8 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ repos:
language: system
types: [python]
require_serial: true
- id: isort
name: isort
entry: uv run isort
- id: ruff
name: ruff check
entry: uv run ruff check --fix
language: system
types: [python]
require_serial: true
Expand All @@ -28,11 +28,6 @@ repos:
language: system
types: [python]
require_serial: true
- id: flake8
name: flake8
entry: uv run flake8
language: system
types: [python]
- id: pylint
name: pylint
entry: uv run pylint
Expand Down
2 changes: 1 addition & 1 deletion example/get_tokens.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Example to fetch local authentication tokens"""
"""Example to fetch local authentication tokens."""

import json
from os import getenv
Expand Down
1 change: 1 addition & 0 deletions glocaltokens/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Glocaltokens library."""
82 changes: 36 additions & 46 deletions glocaltokens/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
"""Client"""
"""Client."""

from __future__ import annotations

from datetime import datetime
import json
import logging
import random
from typing import TYPE_CHECKING

from ghome_foyer_api.api_pb2 import ( # pylint: disable=no-name-in-module
GetHomeGraphRequest,
Expand All @@ -14,7 +15,6 @@
from ghome_foyer_api.api_pb2_grpc import StructuresServiceStub
from gpsoauth import perform_master_login, perform_oauth
import grpc
from zeroconf import Zeroconf

from .const import (
ACCESS_TOKEN_APP_NAME,
Expand All @@ -28,17 +28,21 @@
HOMEGRAPH_DURATION,
)
from .scanner import NetworkDevice, discover_devices
from .types import DeviceDict
from .utils import network as net_utils, token as token_utils
from .utils.logs import censor
from .utils.network import is_valid_ipv4_address

if TYPE_CHECKING:
from zeroconf import Zeroconf

from .types import DeviceDict

logging.basicConfig(level=logging.ERROR)
LOGGER = logging.getLogger(__name__)


class Device:
"""Device representation"""
"""Device representation."""

def __init__(
self,
Expand All @@ -48,9 +52,7 @@ def __init__(
network_device: NetworkDevice | None = None,
hardware: str | None = None,
):
"""
Initializes a Device.
"""
"""Initialize a Device."""
log_prefix = f"[Device - {device_name}(id={device_id})]"
LOGGER.debug("%s Initializing new Device instance", log_prefix)
self.device_id = device_id
Expand Down Expand Up @@ -109,10 +111,11 @@ def __init__(
self.local_auth_token = local_auth_token

def __str__(self) -> str:
"""Return string representation of the device."""
return str(self.as_dict())

def as_dict(self) -> DeviceDict:
"""Dictionary representation"""
"""Return the dictionary representation of the device."""
return {
"device_id": self.device_id,
"device_name": self.device_name,
Expand All @@ -126,7 +129,7 @@ def as_dict(self) -> DeviceDict:


class GLocalAuthenticationTokens:
"""Client"""
"""Client."""

def __init__(
self,
Expand All @@ -136,18 +139,14 @@ def __init__(
android_id: str | None = None,
verbose: bool = False,
):
"""
Initialize an GLocalAuthenticationTokens instance with google account
credentials
:params
username: google account username;
password: google account password (can be an app password);
master_token: google master token (instead of username/password
combination);
android_id: the id of an android device. Will be randomly generated
if not set;
verbose: whether or not print debug logging information
"""Initialize a GLocalAuthenticationTokens instance with Google account credentials.
:params
username: Google account username;
password: Google account password (can be an app password);
master_token: Google master token (instead of username/password combination);
android_id: The ID of an Android device. Will be randomly generated if not set;
verbose: Whether or not to print debug logging information.
"""
self.logging_level = logging.DEBUG if verbose else logging.ERROR
LOGGER.setLevel(self.logging_level)
Expand Down Expand Up @@ -190,7 +189,7 @@ def __init__(

@staticmethod
def _generate_android_id() -> str:
"""Generate random 16 char long string"""
"""Generate random 16 char long string."""
LOGGER.debug("Generating android id...")
mac_string = "".join(
[f"{random.randrange(16):x}" for _ in range(ANDROID_ID_LENGTH)]
Expand All @@ -199,24 +198,24 @@ def _generate_android_id() -> str:
return mac_string

def get_android_id(self) -> str:
"""Return existing or generate android id"""
"""Return existing or generate android id."""
if not self.android_id:
LOGGER.debug("There is no stored android_id, generating a new one")
self.android_id = self._generate_android_id()
return self.android_id

@staticmethod
def _has_expired(creation_dt: datetime, duration: int) -> bool:
"""Checks if an specified token/object has expired"""
"""Check if an specified token/object has expired."""
return datetime.now().timestamp() - creation_dt.timestamp() > duration

@staticmethod
def _escape_username(username: str) -> str:
"""Escape plus sign for some exotic accounts"""
"""Escape plus sign for some exotic accounts."""
return username.replace("+", "%2B")

def get_master_token(self) -> str | None:
"""Get google master token from username and password"""
"""Get google master token from username and password."""
if self.username is None or self.password is None:
LOGGER.error("Username and password are not set.")
return None
Expand All @@ -234,7 +233,7 @@ def get_master_token(self) -> str | None:
self.get_android_id(),
)
except ValueError:
LOGGER.error(
LOGGER.exception(
"A ValueError exception has been thrown, this usually is related"
"to a password length that exceeds the boundaries (too long)."
)
Expand All @@ -247,7 +246,7 @@ def get_master_token(self) -> str | None:
return self.master_token

def get_access_token(self) -> str | None:
"""Return existing or fetch access_token"""
"""Return existing or fetch access_token."""
if (
self.access_token is None
or self.access_token_date is None
Expand Down Expand Up @@ -286,7 +285,7 @@ def get_access_token(self) -> str | None:
return self.access_token

def get_homegraph(self, auth_attempts: int = 3) -> GetHomeGraphResponse | None:
"""Returns the entire Google Home Foyer V2 service"""
"""Return the entire Google Home Foyer V2 service."""
if (
self.homegraph is None
or self.homegraph_date is None
Expand Down Expand Up @@ -342,7 +341,7 @@ def get_homegraph(self, auth_attempts: int = 3) -> GetHomeGraphResponse | None:
)
self.invalidate_access_token()
return self.get_homegraph(auth_attempts - 1)
LOGGER.error(
LOGGER.exception(
"%s Received unknown RPC error: code=%s message=%s",
log_prefix,
rpc_error.code(), # pylint: disable=no-member
Expand All @@ -360,9 +359,7 @@ def get_google_devices(
force_homegraph_reload: bool = False,
discovery_timeout: int = DISCOVERY_TIMEOUT,
) -> list[Device]:
"""
Returns a list of google devices with their local authentication tokens,
and IP and ports if set in models_list.
"""Return a list of Google devices with their local authentication tokens, IP, and ports.
models_list: The list of accepted model names.
disable_discovery: Whether or not the device's IP and port should
Expand Down Expand Up @@ -397,11 +394,9 @@ def is_dict_with_valid_ipv4_addresses(data: dict[str, str]) -> bool:
)

if addresses and not is_dict_with_valid_ipv4_addresses(addresses):
# We need to disable flake8-use-fstring because of the brackets,
# it causes a false positive.
LOGGER.error(
"Invalid dictionary structure for addresses dictionary "
"argument. Correct structure is {'device_name': 'ipaddress'}" # noqa
"argument. Correct structure is {'device_name': 'ipaddress'}"
)
return devices

Expand Down Expand Up @@ -492,12 +487,10 @@ def get_google_devices_json(
zeroconf_instance: Zeroconf | None = None,
force_homegraph_reload: bool = False,
) -> str:
"""
Returns a json list of google devices with their local authentication tokens,
and IP and ports if set in models_list.
"""Return a JSON list of devices with authentication tokens, IPs, and ports if models set.
models_list: The list of accepted model names.
indent: The indentation for the json formatting.
indent: The indentation for the JSON formatting.
disable_discovery: Whether or not the device's IP and port should
be searched for in the network.
addresses: Dict of network devices from the local network
Expand All @@ -515,24 +508,21 @@ def get_google_devices_json(
zeroconf_instance=zeroconf_instance,
force_homegraph_reload=force_homegraph_reload,
)
json_string = json.dumps(
[obj.as_dict() for obj in google_devices], indent=indent
)
return json_string
return json.dumps([obj.as_dict() for obj in google_devices], indent=indent)

def invalidate_access_token(self) -> None:
"""Invalidates the current access token"""
"""Invalidate the current access token."""
self.access_token = None
self.access_token_date = None
LOGGER.debug("Invalidated access_token")

def invalidate_master_token(self) -> None:
"""Invalidates the current master token"""
"""Invalidate the current master token."""
self.master_token = None
LOGGER.debug("Invalidated master_token")

def invalidate_homegraph(self) -> None:
"""Invalidates the stored homegraph data"""
"""Invalidate the stored homegraph data."""
self.homegraph = None
self.homegraph_date = None
LOGGER.debug("Invalidated homegraph")
2 changes: 1 addition & 1 deletion glocaltokens/const.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Globally used constants"""
"""Globally used constants."""

from __future__ import annotations

Expand Down
22 changes: 13 additions & 9 deletions glocaltokens/scanner.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
"""Zeroconf based scanner"""
"""Zeroconf based scanner."""

from __future__ import annotations

import logging
from threading import Event
from typing import Callable, NamedTuple
from typing import TYPE_CHECKING, NamedTuple

from zeroconf import ServiceBrowser, ServiceInfo, ServiceListener, Zeroconf

from .const import DISCOVERY_TIMEOUT, GOOGLE_CAST_GROUP
from .utils import network as net_utils

if TYPE_CHECKING:
from collections.abc import Callable

LOGGER = logging.getLogger(__name__)


class NetworkDevice(NamedTuple):
"""Discovered Google device representation"""
"""Discovered Google device representation."""

name: str
ip_address: str
Expand All @@ -25,10 +28,10 @@ class NetworkDevice(NamedTuple):


class CastListener(ServiceListener):
"""
Zeroconf Cast Services collection.
"""Zeroconf Cast Services collection.
Credit (pychromecast):
https://github.com/home-assistant-libs/pychromecast/
https://github.com/home-assistant-libs/pychromecast/.
"""

def __init__(
Expand All @@ -37,6 +40,7 @@ def __init__(
remove_callback: Callable[[], None] | None = None,
update_callback: Callable[[], None] | None = None,
):
"""Create cast listener."""
self.devices: dict[str, NetworkDevice] = {}
self.add_callback = add_callback
self.remove_callback = remove_callback
Expand All @@ -58,7 +62,7 @@ def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
self._add_update_service(zc, type_, name, self.update_callback)

def remove_service(self, _zc: Zeroconf, type_: str, name: str) -> None:
"""Called when a cast has been lost (mDNS info expired or host down)."""
"""Remove a cast device when its mDNS info expires or the host is down."""
LOGGER.debug("remove_service %s, %s", type_, name)
if name in self.devices:
del self.devices[name]
Expand Down Expand Up @@ -146,13 +150,13 @@ def discover_devices(
zeroconf_instance: Zeroconf | None = None,
logging_level: int = logging.ERROR,
) -> list[NetworkDevice]:
"""Discover devices"""
"""Discover devices."""
LOGGER.setLevel(logging_level)

LOGGER.debug("Discovering devices...")

def callback() -> None:
"""Called when zeroconf has discovered a new device."""
"""Handle the event when zeroconf discovers a new device."""
if max_devices is not None and listener.count >= max_devices:
discovery_complete.set()

Expand Down
2 changes: 1 addition & 1 deletion glocaltokens/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Types used for package typing"""
"""Types used for package typing."""

from __future__ import annotations

Expand Down
1 change: 1 addition & 0 deletions glocaltokens/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Utils package."""
5 changes: 2 additions & 3 deletions glocaltokens/utils/logs.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
"""Log utilities"""
"""Log utilities."""

from __future__ import annotations


def censor(
text: str | None, hide_length: bool = False, hide_first_letter: bool = False
) -> str:
"""
Hide sensitive information.
"""Hide sensitive information.
text: The text to censure.
"""
Expand Down
Loading

0 comments on commit ecdac47

Please sign in to comment.