Skip to content

Commit

Permalink
Switch from pynput to libvinput (#134)
Browse files Browse the repository at this point in the history
* Switch from pynput to libvinput

Signed-off-by: Slendi <[email protected]>

* Change modifier CLI args

* Fix: Update timing after adding key to word

Edge case where a non-allowed char key (e.g. number) typed fast enough to be considered a chord is added as the first character of a word, and timing is not initialized. When a non-chord key (e.g. modifier key) is then pressed (to end the word), the word is logged but there is no timing associated with it, which would cause a crash.

* Fix: device chords format

---------

Signed-off-by: Slendi <[email protected]>
Signed-off-by: Raymond Li <[email protected]>
Co-authored-by: Raymond Li <[email protected]>
  • Loading branch information
xslendix and Raymo111 authored Aug 15, 2024
1 parent efa88a7 commit fdfaa2b
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 86 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
pip install build twine
pip install -r requirements.txt
pip install -r test-requirements.txt
sudo apt-get install xvfb
sudo apt-get install xvfb libx11-dev libxcb-xtest0 libxdo3
- name: Lint with flake8
run: |
flake8 --count --show-source --statistics --max-line-length=120 \
Expand Down
4 changes: 1 addition & 3 deletions dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ def run_command(command: str):

if not (args.no_build or args.ui_only):
# Pyinstaller command
build_cmd = "pyinstaller --onefile --name nexus nexus/__main__.py --icon ui/images/icon.ico"
if os_name == "notwin": # Add hidden imports for Linux
build_cmd += " --hidden-import pynput.keyboard._xorg --hidden-import pynput.mouse._xorg"
build_cmd = "pyinstaller --onefile --name nexus nexus/__main__.py --icon ui/images/icon.ico --collect-all vinput"

if os_name == "win":
print("Building windowed executable...")
Expand Down
13 changes: 9 additions & 4 deletions nexus/Freqlog/Definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,8 @@
from enum import Enum
from typing import Any, Self

from pynput.keyboard import Key

from nexus import __author__
import vinput


class Defaults:
Expand All @@ -15,9 +14,10 @@ class Defaults:
{chr(i) for i in range(ord('a'), ord('z') + 1)} | {chr(i) for i in range(ord('A'), ord('Z') + 1)}
DEFAULT_ALLOWED_CHARS: set = \
DEFAULT_ALLOWED_FIRST_CHARS | {"'", "-", "_", "/", "~"} # | {chr(i) for i in range(ord('0'), ord('9') + 1)}
DEFAULT_MODIFIERS: set = [
'left_control', 'left_alt', 'left_meta', 'left_super', 'left_hyper',
'right_control', 'right_alt', 'right_meta', 'right_super', 'right_hyper']
# TODO: uncomment above line when first char detection is implemented
DEFAULT_MODIFIER_KEYS: set = {Key.ctrl, Key.ctrl_l, Key.ctrl_r, Key.alt, Key.alt_l, Key.alt_r, Key.alt_gr, Key.cmd,
Key.cmd_l, Key.cmd_r}
DEFAULT_NEW_WORD_THRESHOLD: float = 5 # seconds after which character input is considered a new word
DEFAULT_CHORD_CHAR_THRESHOLD: int = 5 # milliseconds between characters in a chord to be considered a chord
DEFAULT_DB_FILE: str = "nexus_freqlog_db.sqlite3"
Expand All @@ -39,6 +39,11 @@ class Defaults:
else: # Fallback (unknown platform)
DEFAULT_DB_PATH = DEFAULT_DB_FILE

# The names of all available modifiers in libvinput.
MODIFIER_NAMES = [
a for a in dir(vinput.KeyboardModifiers)
if not a.startswith('_') and type(getattr(vinput.KeyboardModifiers, a)).__name__ == "CField"]

# Create directory if it doesn't exist
os.makedirs(os.path.dirname(DEFAULT_DB_PATH), exist_ok=True)

Expand Down
165 changes: 101 additions & 64 deletions nexus/Freqlog/Freqlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from queue import Empty as EmptyException, Queue
from threading import Thread
from typing import Optional
import sys

from charachorder import CharaChorder, SerialException
from pynput import keyboard as kbd, mouse
import vinput

from .backends import Backend, SQLiteBackend
from .Definitions import ActionType, BanlistAttr, BanlistEntry, CaseSensitivity, ChordMetadata, ChordMetadataAttr, \
Expand All @@ -16,18 +17,20 @@

class Freqlog:

def _on_press(self, key: kbd.Key | kbd.KeyCode) -> None:
"""Store PRESS, key and current time in queue"""
self.q.put((ActionType.PRESS, key, datetime.now()))

def _on_release(self, key: kbd.Key | kbd.KeyCode) -> None:
""""Store RELEASE, key and current time in queue"""
if key in self.modifier_keys:
self.q.put((ActionType.RELEASE, key, datetime.now()))
def _on_key(self, key: vinput.KeyboardEvent) -> None:
kind = ActionType.PRESS
if not key.pressed:
kind = ActionType.RELEASE
if key.keychar == '' or key.keychar == '\0':
return
self.q.put((kind, key.keychar.lower(), key.modifiers, datetime.now()))

def _on_click(self, _x, _y, button: mouse.Button, _pressed) -> None:
def _on_mouse_button(self, button: vinput.MouseButtonEvent) -> None:
"""Store PRESS, key and current time in queue"""
self.q.put((ActionType.PRESS, button, datetime.now()))
self.q.put((ActionType.PRESS, button, None, datetime.now()))

def _on_mouse_move(self, move: vinput.MouseMoveEvent) -> None:
pass

def _log_word(self, word: str, start_time: datetime, end_time: datetime) -> None:
"""
Expand Down Expand Up @@ -59,14 +62,20 @@ def _log_chord(self, chord: str, start_time: datetime, end_time: datetime) -> No
logging.info(f"Banned chord, {end_time}")
logging.debug(f"(Banned chord was '{chord}')")

# This checks if the event is either a valid key or if it should be treated as mouse input/modifier key press.
@staticmethod
def _is_key(x: str | vinput.MouseButtonEvent) -> bool:
if isinstance(x, vinput.MouseButtonEvent):
return False
return x != '' and x != '\0'

def _process_queue(self):
word: str = "" # word to be logged, reset on criteria below
word_start_time: datetime | None = None
word_end_time: datetime | None = None
chars_since_last_bs: int = 0
avg_char_time_after_last_bs: timedelta | None = None
last_key_was_disallowed: bool = False
active_modifier_keys: set = set()

def _get_timed_interruptable(q, timeout):
# Based on https://stackoverflow.com/a/37016663/9206488
Expand Down Expand Up @@ -116,30 +125,49 @@ def _log_and_reset_word(min_length: int = 2) -> None:
avg_char_time_after_last_bs = None
last_key_was_disallowed = False

def _update_timing():
"""Must be called after adding a key to word and before self.q.task_done()"""
nonlocal word_start_time, word_end_time, last_key_was_disallowed, chars_since_last_bs, \
avg_char_time_after_last_bs
if not word_start_time:
word_start_time = time_pressed
elif chars_since_last_bs > 1 and avg_char_time_after_last_bs:
# Should only get here if chars_since_last_bs > 2
avg_char_time_after_last_bs = (avg_char_time_after_last_bs * (chars_since_last_bs - 1) +
(time_pressed - word_end_time)) / chars_since_last_bs
elif chars_since_last_bs > 1:
avg_char_time_after_last_bs = time_pressed - word_end_time
word_end_time = time_pressed

while self.is_logging:
try:
action: ActionType
key: kbd.Key | kbd.KeyCode | mouse.Button
key: str | vinput.MouseButtonEvent
modifiers: vinput.KeyboardModifiers | None
time_pressed: datetime

# Blocking here makes the while-True non-blocking
action, key, time_pressed = _get_timed_interruptable(self.q, self.new_word_threshold)
action, key, modifiers, time_pressed = _get_timed_interruptable(self.q, self.new_word_threshold)

if isinstance(key, bytes):
key = key.decode('utf-8')

# Debug keystrokes
if isinstance(key, kbd.Key) or isinstance(key, kbd.KeyCode):
if self._is_key(key):
logging.debug(f"{action}: {key} - {time_pressed}")
logging.debug(f"word: '{word}', active_modifier_keys: {active_modifier_keys}")
logging.debug(f"word: '{word}', modifiers: {modifiers}")

# Update modifier keys
if action == ActionType.PRESS and key in self.modifier_keys:
active_modifier_keys.add(key)
elif action == ActionType.RELEASE:
active_modifier_keys.discard(key)
if action == ActionType.RELEASE:
continue

# On backspace, remove last char from word if word is not empty
if key == kbd.Key.backspace and word:
if active_modifier_keys.intersection({kbd.Key.ctrl, kbd.Key.ctrl_l, kbd.Key.ctrl_r,
kbd.Key.cmd, kbd.Key.cmd_l, kbd.Key.cmd_r}):
if key == '\b' and word:
if sys.platform == 'darwin':
word_del_cond = modifiers.left_alt or modifiers.right_alt
else:
word_del_cond = modifiers.left_control or modifiers.right_control

if word_del_cond:
# Remove last word from word
# FIXME: make this work - rn _log_and_reset_word() is called immediately upon ctrl/cmd keydown
# TODO: make this configurable (i.e. for vim, etc)
Expand All @@ -160,61 +188,57 @@ def _log_and_reset_word(min_length: int = 2) -> None:
continue

# Handle whitespace/disallowed keys
if ((isinstance(key, kbd.Key) and key in {kbd.Key.space, kbd.Key.tab, kbd.Key.enter}) or
(isinstance(key, kbd.KeyCode) and (not key.char or key.char not in self.allowed_chars))):
if self._is_key(key) and (key in " \t\n\r" or key not in self.allowed_chars):
# If key is whitespace/disallowed and timing is more than chord_char_threshold, log and reset word
if (word and avg_char_time_after_last_bs and
avg_char_time_after_last_bs > timedelta(milliseconds=self.chord_char_threshold)):
logging.debug(f"Whitespace/disallowed, log+reset: {word}")
_log_and_reset_word()
else: # Add key to chord
match key:
case kbd.Key.space:
word += " "
case kbd.Key.tab:
word += "\t"
case kbd.Key.enter:
word += "\n"
case _:
if isinstance(key, kbd.KeyCode) and key.char:
word += key.char
if self._is_key(key):
word += key
_update_timing()
last_key_was_disallowed = True
self.q.task_done()
continue

# On non-chord key, log and reset word if it exists
# Non-chord key = key in modifier keys or non-key
# FIXME: support modifier keys in chords
if key in self.modifier_keys or not (isinstance(key, kbd.Key) or isinstance(key, kbd.KeyCode)):
if not self._is_key(key):
logging.debug(f"Non-chord key: {key}")
if word:
_log_and_reset_word()
self.q.task_done()
continue

# Add new char to word and update word timing if no modifier keys are pressed
if isinstance(key, kbd.KeyCode) and not active_modifier_keys and key.char:
banned_modifier_active = False
for attr in Defaults.MODIFIER_NAMES:
# Skip non-enabled modifiers
if not getattr(self.modifier_keys, attr):
continue

if getattr(modifiers, attr):
banned_modifier_active = True
break

# Add new char to word and update word timing if no banned modifier keys are pressed
if not banned_modifier_active:
# I think this is for chords that end in space
# If last key was disallowed and timing of this key is more than chord_char_threshold, log+reset
if (last_key_was_disallowed and word and word_end_time and
(time_pressed - word_end_time) > timedelta(milliseconds=self.chord_char_threshold)):
logging.debug(f"Disallowed and timing, log+reset: {word}")
_log_and_reset_word()
word += key.char
word += key
chars_since_last_bs += 1

# TODO: code below potentially needs to be copied to edge cases above
if not word_start_time:
word_start_time = time_pressed
elif chars_since_last_bs > 1 and avg_char_time_after_last_bs:
# Should only get here if chars_since_last_bs > 2
avg_char_time_after_last_bs = (avg_char_time_after_last_bs * (chars_since_last_bs - 1) +
(time_pressed - word_end_time)) / chars_since_last_bs
elif chars_since_last_bs > 1:
avg_char_time_after_last_bs = time_pressed - word_end_time
word_end_time = time_pressed
_update_timing()
self.q.task_done()
continue

# Should never get here
logging.error(f"Uncaught key: {key}")
self.q.task_done()
except EmptyException: # Queue is empty
# If word is older than NEW_WORD_THRESHOLD seconds, log and reset word
if word:
Expand All @@ -237,7 +261,7 @@ def _get_chords(self):
self.chords = []
started_logging = False # prevent early short-circuit
for chord, phrase in self.device.get_chordmaps():
self.chords.append(str(phrase).strip())
self.chords.append(''.join(phrase).strip())
if not self.is_logging: # Short circuit if logging is stopped
if started_logging:
logging.info("Stopped getting chords from device")
Expand Down Expand Up @@ -298,6 +322,7 @@ def __init__(self, backend_path: str, password_callback: callable, loggable: boo
logging.error(e)

self.is_logging: bool = False # Used in self._get_chords, needs to be initialized here
self.loggable = loggable
if loggable:
logging.info(f"Logging set to freqlog db at {backend_path}")

Expand All @@ -306,21 +331,23 @@ def __init__(self, backend_path: str, password_callback: callable, loggable: boo

self.backend: Backend = SQLiteBackend(backend_path, password_callback, upgrade_callback)
self.q: Queue = Queue()
self.listener: kbd.Listener | None = None
self.mouse_listener: mouse.Listener | None = None
if loggable:
self.listener = kbd.Listener(on_press=self._on_press, on_release=self._on_release, name="Keyboard Listener")
self.mouse_listener = mouse.Listener(on_click=self._on_click, name="Mouse Listener")
self.listener: vinput.EventListener | None = None
self.listener_thread = Thread(target=lambda: self._log_start())
self.new_word_threshold: float = Defaults.DEFAULT_NEW_WORD_THRESHOLD
self.chord_char_threshold: int = Defaults.DEFAULT_CHORD_CHAR_THRESHOLD
self.allowed_chars: set = Defaults.DEFAULT_ALLOWED_CHARS
self.allowed_first_chars: set = Defaults.DEFAULT_ALLOWED_FIRST_CHARS
self.modifier_keys: set = Defaults.DEFAULT_MODIFIER_KEYS
self.modifier_keys: vinput.KeyboardModifiers = vinput.KeyboardModifiers()
for attr in Defaults.DEFAULT_MODIFIERS:
setattr(self.modifier_keys, attr, True)
self.killed: bool = False

def start_logging(self, new_word_threshold: float | None = None, chord_char_threshold: int | None = None,
allowed_chars: set | str | None = None, allowed_first_chars: set | str | None = None,
modifier_keys: set = None) -> None:
modifier_keys: vinput.KeyboardModifiers | None = None) -> None:
if not self.loggable:
return

if isinstance(allowed_chars, set):
self.allowed_chars = allowed_chars
elif isinstance(allowed_chars, str):
Expand All @@ -342,21 +369,31 @@ def start_logging(self, new_word_threshold: float | None = None, chord_char_thre
f"allowed_chars={self.allowed_chars}, "
f"allowed_first_chars={self.allowed_first_chars}, "
f"modifier_keys={self.modifier_keys}")
self.listener.start()
self.mouse_listener.start()

self.listener_thread.start()
self.is_logging = True
logging.warning("Started freqlogging")
self._process_queue()

def _log_start(self):
self.listener = vinput.EventListener(True, True, True)

try:
self.listener.start(
lambda x: self._on_key(x),
lambda x: self._on_mouse_button(x),
lambda x: self._on_mouse_move(x))
except vinput.VInputException as e:
logging.error("Failed to start listeners: " + str(e))

def stop_logging(self) -> None: # FIXME: find out why this runs twice on one Ctrl-C (does it still?)
if self.killed: # TODO: Forcibly kill if already killed once
exit(1) # This doesn't work rn
self.killed = True
logging.warning("Stopping freqlog")
if self.listener:
self.listener.stop()
if self.mouse_listener:
self.mouse_listener.stop()
del self.listener
self.listener = None
self.is_logging = False
logging.info("Stopped listeners")

Expand Down
21 changes: 11 additions & 10 deletions nexus/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys

from getpass import getpass
from pynput import keyboard
import vinput

from nexus import __doc__, __version__
from nexus.Freqlog import Freqlog
Expand Down Expand Up @@ -78,13 +78,10 @@ def main():
parser_start.add_argument("--allowed-first-chars",
default=Defaults.DEFAULT_ALLOWED_FIRST_CHARS,
help="Chars to be considered as the first char in words")
parser_start.add_argument("--add-modifier-key", action="append", default=[],
help="Add a modifier key to the default set",
choices=sorted(key.name for key in set(keyboard.Key) - Defaults.DEFAULT_MODIFIER_KEYS))
parser_start.add_argument("--remove-modifier-key", action="append", default=[],
help="Remove a modifier key from the default set",
choices=sorted(key.name for key in Defaults.DEFAULT_MODIFIER_KEYS))

parser_start.add_argument("--modifier-keys", default=Defaults.DEFAULT_MODIFIERS,
help="Specify which modifier keys to use",
choices=Defaults.MODIFIER_NAMES,
nargs='+')
# Num words
subparsers.add_parser("numwords", help="Get number of words in freqlog",
parents=[log_arg, path_arg, case_arg, upgrade_arg])
Expand Down Expand Up @@ -346,10 +343,14 @@ def _prompt_for_password(new: bool, desc: str = "") -> str:
except Exception as e:
logging.error(e)
sys.exit(4)
mods = vinput.KeyboardModifiers()
logging.debug('Activated modifier keys:')
for mod in args.modifier_keys:
logging.debug(' - ' + str(mod))
setattr(mods, mod, True)
signal.signal(signal.SIGINT, lambda _: freqlog.stop_logging())
freqlog.start_logging(args.new_word_threshold, args.chord_char_threshold, args.allowed_chars,
args.allowed_first_chars, Defaults.DEFAULT_MODIFIER_KEYS -
set(args.remove_modifier_key) | set(args.add_modifier_key))
args.allowed_first_chars, mods)
case "checkword": # Check if word is banned
for word in args.word:
if freqlog.check_banned(word):
Expand Down
Loading

0 comments on commit fdfaa2b

Please sign in to comment.