Skip to content

Commit

Permalink
[Fix] Launch without Bluetooth service
Browse files Browse the repository at this point in the history
  • Loading branch information
melianmiko committed Sep 23, 2024
1 parent 64d343a commit 80653e0
Showing 1 changed file with 19 additions and 16 deletions.
35 changes: 19 additions & 16 deletions openfreebuds_backend/linux/bluez_io.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
import logging
from contextlib import suppress
from typing import Optional

from dbus_next import BusType
from dbus_next.errors import DBusError
Expand All @@ -18,33 +20,33 @@ class DBusConn:
async def prepare():
if DBusConn.system is None:
DBusConn.system = await MessageBus(bus_type=BusType.SYSTEM).connect()
if DBusConn.bluez_introspect is None or DBusConn.bluez is None:
DBusConn.bluez_introspect = await DBusConn.system.introspect("org.bluez", "/")
DBusConn.bluez = DBusConn.system.get_proxy_object("org.bluez", "/",
DBusConn.bluez_introspect)


# noinspection PyUnresolvedReferences
async def bt_is_connected(address):
try:
with suppress(AttributeError):
device, device_introspect = await _dbus_find_bt_device(address)
if device is None:
return None

interface = device.get_interface("org.bluez.Device1")
return await interface.get_connected()
except AttributeError:
log.exception(f"Failed to check connection state of {address}")
return False

return False


# noinspection PyUnresolvedReferences
async def bt_connect(address):
try:
device, device_introspect = await _dbus_find_bt_device(address)
if device is None:
return None

interface = device.get_interface("org.bluez.Device1")
# noinspection PyUnresolvedReferences
await interface.call_connect()
await asyncio.sleep(1)
return True
Expand All @@ -53,14 +55,14 @@ async def bt_connect(address):
return False


# noinspection PyUnresolvedReferences
async def bt_disconnect(address):
try:
device, device_introspect = await _dbus_find_bt_device(address)
if device is None:
return None

interface = device.get_interface("org.bluez.Device1")
# noinspection PyUnresolvedReferences
await interface.call_disconnect()
await asyncio.sleep(1)
return True
Expand All @@ -69,12 +71,13 @@ async def bt_disconnect(address):
return False


# noinspection PyUnresolvedReferences
async def bt_list_devices():
scan_results = []

try:
with suppress(DBusError):
await DBusConn.prepare()
if DBusConn.bluez is None:
return scan_results
obj_manager = DBusConn.bluez.get_interface("org.freedesktop.DBus.ObjectManager")
# noinspection PyUnresolvedReferences
all_objects = await obj_manager.call_get_managed_objects()
Expand All @@ -84,6 +87,7 @@ async def bt_list_devices():
device_introspect = await DBusConn.system.introspect("org.bluez", path)
device = DBusConn.system.get_proxy_object("org.bluez", path, device_introspect)
properties = device.get_interface("org.freedesktop.DBus.Properties")
# noinspection PyUnresolvedReferences
props = await properties.call_get_all("org.bluez.Device1")

try:
Expand All @@ -94,16 +98,15 @@ async def bt_list_devices():
})
except AttributeError:
log.debug(f"Skip broken bluez device {path}")
except DBusError:
log.exception("Failed to list devices")

return scan_results


# noinspection PyUnresolvedReferences
async def _dbus_find_bt_device(address: str) -> tuple[ProxyObject, Node]:
try:
async def _dbus_find_bt_device(address: str) -> tuple[Optional[ProxyObject], Optional[Node]]:
with suppress(DBusError):
await DBusConn.prepare()
if DBusConn.bluez is None:
return None, None
obj_manager = DBusConn.bluez.get_interface("org.freedesktop.DBus.ObjectManager")
# noinspection PyUnresolvedReferences
all_objects = await obj_manager.call_get_managed_objects()
Expand All @@ -113,16 +116,16 @@ async def _dbus_find_bt_device(address: str) -> tuple[ProxyObject, Node]:
device_introspect = await DBusConn.system.introspect("org.bluez", path)
device = DBusConn.system.get_proxy_object("org.bluez", path, device_introspect)
properties = device.get_interface("org.freedesktop.DBus.Properties")
# noinspection PyUnresolvedReferences
addr = await properties.call_get("org.bluez.Device1", "Address")

try:
if addr.value == address:
return device, device_introspect
except AttributeError:
pass
except DBusError:
log.exception("Failed to find device")
return None, None

return None, None


if __name__ == "__main__":
Expand Down

0 comments on commit 80653e0

Please sign in to comment.