Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Send keystrokes via BLE GATT server #113

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
24 changes: 24 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Debug client.py (write value)",
"type": "debugpy",
"request": "launch",
"program": "${workspaceRoot}/src/gatt_client/client.py",
"console": "integratedTerminal",
"args" : ["-a", "B8:27:EB:9C:F6:4C", "-c", "00000001-6907-4437-8539-9218a9d54e29", "Win"]
},
{
"name": "client.py (help)",
"type": "debugpy",
"request": "launch",
"program": "${workspaceRoot}/src/gatt_client/client.py",
"console": "integratedTerminal",
"args" : ["--help"]
}
]
}
258 changes: 217 additions & 41 deletions README.md

Large diffs are not rendered by default.

15 changes: 13 additions & 2 deletions bluetooth_2_usb.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from src.bluetooth_2_usb.args import parse_args
from src.bluetooth_2_usb.logging import add_file_handler, get_logger
from src.bluetooth_2_usb.relay import RelayController, async_list_input_devices
from src.bluetooth_2_usb.relay_ble import RelayBleController


logger = get_logger()
Expand Down Expand Up @@ -49,9 +50,19 @@ async def main() -> NoReturn:
logger.debug(log_handlers_message)
logger.info(f"Launching {VERSIONED_NAME}")

controller = RelayController(args.device_ids, args.auto_discover, args.grab_devices)
await controller.async_relay_devices()
tasks = []
if args.no_input_relay & args.no_ble_relay:
raise RuntimeError("Both input and BLE realys are disabled.")

if not args.no_input_relay:
input_controller = RelayController(args.device_ids, args.auto_discover, args.grab_devices)
tasks.append(input_controller.async_relay_devices())

if not args.no_ble_relay:
ble_controller = RelayBleController(args.accept_non_trusted, args.partial_parse_ble_command)
tasks.append(ble_controller.async_relay_ble())

await asyncio.gather(*tasks)

async def async_list_devices():
for dev in await async_list_input_devices():
Expand Down
3 changes: 3 additions & 0 deletions requirements.client.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
quax-circuitpython-hid==6.0.2.post1
bleak==0.21.1
parameterized==0.9.0
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ quax-circuitpython-hid==6.0.2.post1
rpi-ws281x==5.0.0
RPi.GPIO==0.7.1
sysv-ipc==1.1.0
dbus-next==0.2.3
bless==0.2.5
15 changes: 8 additions & 7 deletions scripts/update.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Update Bluetooth 2 USB to the latest stable GitHub version. Handles updating submodules, if required.
# Update Bluetooth 2 USB to the latest stable GitHub version. Handles updating submodules, if required.

# Temporarily disable history expansion
set +H
Expand Down Expand Up @@ -56,12 +56,13 @@ current_group=$(stat -c '%G' .) || abort_update "Failed retrieving current group
current_branch=$(git symbolic-ref --short HEAD) || abort_update "Failed retrieving currently checked out branch."

{
scripts/uninstall.sh &&
cd .. &&
rm -rf "${base_directory}" &&
git clone https://github.com/quaxalber/bluetooth_2_usb.git &&
cd "${base_directory}" &&
scripts/uninstall.sh &&
cd .. &&
rm -rf "${base_directory}" &&
# git clone https://github.com/quaxalber/bluetooth_2_usb.git &&
git clone https://github.com/ig-sinicyn/bluetooth_2_usb.git &&
cd "${base_directory}" &&
git checkout "${current_branch}" &&
chown -R ${current_user}:${current_group} "${base_directory}" &&
scripts/install.sh ;
scripts/install.sh ;
} || abort_update "Failed updating Bluetooth 2 USB"
58 changes: 57 additions & 1 deletion src/bluetooth_2_usb/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(
*args,
add_help=False,
description="Bluetooth to USB HID relay. Handles Bluetooth keyboard and mouse events from multiple input devices and translates them to USB using Linux's gadget mode.",
description="Bluetooth to USB HID relay. Handles Bluetooth keyboard and mouse events from multiple input devices and translates them to USB using Linux's gadget mode. Handles keycodes sent to BLE GATT characteristic and translates them to USB using Linux's gadget mode.",
formatter_class=argparse.RawTextHelpFormatter,
**kwargs,
)
Expand Down Expand Up @@ -47,6 +47,30 @@ def _add_arguments(self) -> None:
default=False,
help="List all available input devices and exit.",
)
self.add_argument(
"--no-input-relay",
action="store_true",
default=False,
help="Disable input relay mode (sends input keys to USB HID device)\nDefault: input relay enabled.",
)
self.add_argument(
"--no-ble-relay",
action="store_true",
default=False,
help="Disable BLE relay mode (BLE server that sends keystrokes to USB HID device)\nDefault: BLE relay enabled.",
)
self.add_argument(
"--accept-non-trusted",
action="store_true",
default=False,
help="UNSAFE! Accepts non-trusted BLE relay clients.",
)
self.add_argument(
"--partial-parse-ble-command",
action="store_true",
default=False,
help="Enables partial parsing of GATT characteristic input (ignores unknown key names).",
)
self.add_argument(
"--log_to_file",
"-f",
Expand Down Expand Up @@ -104,6 +128,10 @@ class Arguments:
"_auto_discover",
"_grab_devices",
"_list_devices",
"_no_input_relay",
"_no_ble_relay",
"_accept_non_trusted",
"_partial_parse_ble_command",
"_log_to_file",
"_log_path",
"_debug",
Expand All @@ -116,6 +144,10 @@ def __init__(
auto_discover: bool,
grab_devices: bool,
list_devices: bool,
no_input_relay: bool,
no_ble_relay: bool,
accept_non_trusted: bool,
partial_parse_ble_command: bool,
log_to_file: bool,
log_path: str,
debug: bool,
Expand All @@ -125,6 +157,10 @@ def __init__(
self._auto_discover = auto_discover
self._grab_devices = grab_devices
self._list_devices = list_devices
self._no_input_relay = no_input_relay
self._no_ble_relay = no_ble_relay
self._accept_non_trusted = accept_non_trusted
self._partial_parse_ble_command = partial_parse_ble_command
self._log_to_file = log_to_file
self._log_path = log_path
self._debug = debug
Expand All @@ -146,6 +182,22 @@ def grab_devices(self) -> bool:
def list_devices(self) -> bool:
return self._list_devices

@property
def no_input_relay(self) -> bool:
return self._no_input_relay

@property
def no_ble_relay(self) -> bool:
return self._no_ble_relay

@property
def accept_non_trusted(self) -> bool:
return self._accept_non_trusted

@property
def partial_parse_ble_command(self) -> bool:
return self._partial_parse_ble_command

@property
def log_to_file(self) -> bool:
return self._log_to_file
Expand Down Expand Up @@ -182,6 +234,10 @@ def parse_args() -> Arguments:
auto_discover=args.auto_discover,
grab_devices=args.grab_devices,
list_devices=args.list_devices,
no_input_relay=args.no_input_relay,
no_ble_relay=args.no_ble_relay,
accept_non_trusted=args.accept_non_trusted,
partial_parse_ble_command=args.partial_parse_ble_command,
log_to_file=args.log_to_file,
log_path=args.log_path,
debug=args.debug,
Expand Down
170 changes: 170 additions & 0 deletions src/bluetooth_2_usb/relay_ble.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import asyncio
from enum import Flag
from typing import NoReturn, Dict
from adafruit_hid.keyboard import Keyboard
import usb_hid
from usb_hid import Device

from bless import (
BlessServer,
BlessGATTCharacteristic,
GATTAttributePermissions
)
from bleak.backends.bluezdbus.characteristic import ( # type: ignore
_GattCharacteristicsFlagsEnum
)

from .logging import get_logger

from src.bluetooth_2_usb.shortcut_parser import ShortcutParser
from src.bluetooth_2_usb.relay import (all_gadgets_ready, init_usb_gadgets)


class CustomGATTCharacteristicProperties(Flag):
broadcast = 0x00001
read = 0x00002
write_without_response = 0x00004
write = 0x00008
notify = 0x00010
indicate = 0x00020
authenticated_signed_writes = 0x00040
extended_properties = 0x00080
reliable_write = 0x00100
writable_auxiliaries = 0x00200
encrypt_read = 0x00400
encrypt_write = 0x00800
encrypt_authenticated_read = 0x01000
encrypt_authenticated_write = 0x02000
secure_read = 0x04000 #(Server only)
secure_write = 0x08000 #(Server only)
authorize = 0x10000


# HACK: redefine disabled characteristic mapping for bless to bluezdbus backend
# see https://github.com/hbldh/bleak/blob/master/bleak/backends/bluezdbus/characteristic.py#L20-L26
_AddCustomGattCharacteristicsFlagsEnum: dict[int, str] = {
0x00400: "encrypt-read",
0x00800: "encrypt-write",
0x01000: "encrypt-authenticated-read",
0x02000: "encrypt-authenticated-write",
0x04000: "secure-read", #(Server only)
0x08000: "secure-write", #(Server only)
0x10000: "authorize",
}
for key in _AddCustomGattCharacteristicsFlagsEnum:
_GattCharacteristicsFlagsEnum[key] = _AddCustomGattCharacteristicsFlagsEnum[key]

_logger = get_logger()

GATT_SERVER_NAME = f"Bluetooth 2 USB"
GATT_SERVICE_ID = "00000000-6907-4437-8539-9218a9d54e29"
GATT_CHARACTERISTIC_ID = "00000001-6907-4437-8539-9218a9d54e29"


class BleRelay:

def __init__(
self,
accept_non_trusted: bool = False,
partial_parse: bool = False) -> None:
self._accept_non_trusted = accept_non_trusted
self._partial_parse = partial_parse
self._shortcut_parser = ShortcutParser()
if not all_gadgets_ready():
init_usb_gadgets()
enabled_devices: list[Device] = list(usb_hid.devices) # type: ignore
self._keyboard_gadget = Keyboard(enabled_devices)

def __str__(self) -> str:
return "BLE TO HID relay"

async def async_relay_events_loop(self) -> NoReturn:
gatt_properties = (CustomGATTCharacteristicProperties.encrypt_authenticated_read
| CustomGATTCharacteristicProperties.encrypt_authenticated_write)
gatt_permissions = (GATTAttributePermissions.read_encryption_required
| GATTAttributePermissions.write_encryption_required)

if (self._accept_non_trusted):
gatt_properties = CustomGATTCharacteristicProperties.read | CustomGATTCharacteristicProperties.write
gatt_permissions = GATTAttributePermissions.readable | GATTAttributePermissions.writeable

# Instantiate the server
gatt: Dict = {
GATT_SERVICE_ID: {
GATT_CHARACTERISTIC_ID: {
"Properties": gatt_properties,
"Permissions": gatt_permissions,
"Value": None
},
}
}
server = BlessServer(name=GATT_SERVER_NAME)
server.read_request_func = self._read_request
server.write_request_func = self._write_request

_logger.debug("Starting GATT server")
await server.add_gatt(gatt)
await server.start()
_logger.debug("GATT server started")

try:
while True:
await asyncio.sleep(0.5)
except* Exception:
_logger.debug("GATT server stopping")
await server.stop()
_logger.debug("GATT server stopped")

def _read_request(
self,
characteristic: BlessGATTCharacteristic,
**kwargs
) -> bytearray:
if characteristic.uuid != GATT_CHARACTERISTIC_ID:
raise RuntimeError(f"Invalid characteristic '{characteristic.uuid}'")
_logger.debug(f"Read last input value '{characteristic.value}' for '{characteristic.uuid}'")
return characteristic.value

def _write_request(
self,
characteristic: BlessGATTCharacteristic,
value: bytearray,
**kwargs
):
if characteristic.uuid != GATT_CHARACTERISTIC_ID:
raise RuntimeError(f"Invalid characteristic '{characteristic.uuid}'")
input = value.decode()
_logger.debug(f"Received input '{input}' for '{characteristic.uuid}'")
parsed_input = self._shortcut_parser.parse_command(input, raise_error=not self._partial_parse)
if len(parsed_input) == 0:
_logger.debug(f"Ignoring invalid input '{input}'.")
return

_logger.debug(f"Keys to send: {parsed_input}")
for shortcut in parsed_input:
self._keyboard_gadget.send(*shortcut.keycodes)
characteristic.value = value

_logger.debug(f"Processed input '{input}' for '{characteristic.uuid}'/")


class RelayBleController:
"""
This class serves as a BLE HID relay to handle Bluetooth GATT characteristic write events and translate them to USB.
"""

def __init__(
self,
accept_non_trusted: bool = False,
partial_parse: bool = False) -> None:
self._partial_parse = partial_parse
self._accept_non_trusted = accept_non_trusted

async def async_relay_ble(self) -> NoReturn:
try:
relay = BleRelay(self._accept_non_trusted, self._partial_parse)
_logger.info(f"Activated {relay}. Pairing required: {not self._accept_non_trusted}. Allows invalid input: {self._partial_parse}")
_logger.info(f"Use {GATT_SERVICE_ID} service / {GATT_CHARACTERISTIC_ID} characteristic to send keystrokes.")
await relay.async_relay_events_loop()
except* Exception:
_logger.exception("Error(s) in relay")
Loading