diff --git a/README.md b/README.md index 2d78c58..b62ce1a 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,33 @@ # Sitting Desktop Garden +
+ +
+ +The **Sitting Desktop Garden** (SDG) is a cute and customisable artificial potted plant for the home office desk. It monitors the user's posture, providing gentle reminders and gamified incentives to maintain a healthy sitting position as you work. Reminders are delivered through haptic feedback in a vibrating mousepad, which is non-intrusive to the user's workflow, and demonstrating consistently good posture unlocks more beautiful plant growth. + +The SDG is controlled with a Raspberry Pi, which runs all machine learning models and stores all user data locally. No internet connection is required once the Raspberry Pi is set up. + +We use a facial recognition system to facilitate user logins and registrations to allow for multiple users to share one SDG. This can be useful in shared workspaces and offices (hot desking). Once the user is logged in, the camera monitors user posture by tracking their body landmarks and determining their neck and hip angles. + +Real-time feedback is delivered via a vibrating mousepad, which reminds the user to sit up straight if they are not sitting correctly. Current-session feedback can be viewed via the SDG's monitor to show the user how their posture has progressed over the current session, as well as via the physical growth of the potted plant. + +For developers, see [Project Overview](#project-overview). For users setting up a Raspberry Pi for use in the SDG, see [Raspberry Pi Setup](#raspberry-pi-setup). + +--- + **Table of Contents** - [Sitting Desktop Garden](#sitting-desktop-garden) - - [Library Overview](#library-overview) + - [Project Overview](#project-overview) + - [Directory Structure](#directory-structure) + - [Dependencies](#dependencies) - [Development](#development) - [Installation](#installation) - - [Dependencies](#dependencies) + - [Dependencies](#dependencies-1) - [Testing](#testing) - [Code Styling](#code-styling) - [Documentation](#documentation) @@ -14,7 +35,8 @@ - [Environment](#environment) - [Deployment](#deployment) -## Library Overview +## Project Overview +### Directory Structure ``` . ├── client @@ -25,6 +47,12 @@ ├── notebooks: Demos for module use. ``` +### Dependencies +The main project dependencies are specified in [pyproject.toml](./pyproject.toml). Notably: +- [mediapipe](https://ai.google.dev/edge/mediapipe/solutions/guide) provides the body landmark detection model. +- [piicodev](https://pypi.org/project/piicodev/) provides modules for interfacing with Raspberry Pi peripherals. +- [face-recognition](https://pypi.org/project/face-recognition/) provides the face rceognition model. + ## Development ### Installation diff --git a/assets/logo.png b/assets/logo.png new file mode 100644 index 0000000..a7ea224 Binary files /dev/null and b/assets/logo.png differ diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index ab0f116..6aafa07 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -99,18 +99,6 @@ def make_failed(cls) -> "ControlledData": # SECTION: Getters/Setters - def DEBUG_get_next_posture_graph_value(self) -> int: - """ - Returns next thing to put on the DEBUG graph. - - TODO: Remove this method - """ - return_me = self._DEBUG_current_graph_function( - self._DEBUG_current_graph_list_index - ) - self._DEBUG_current_graph_list_index += 1 - return return_me - def is_failed(self) -> bool: """ Returns True iff this ControlledData is failed. @@ -183,31 +171,6 @@ def accept_new_posture_data( for datum in posture_data: self._posture_data.put_nowait(datum) - # SECTION: Posture data mapping - - def get_cushion_posture_data( - self, - ) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like - """ - Returns posture data necessary for cushion feedback. - - TODO: Implement this. - """ - print(" WARNING: get_cushion_posture_data() not implemented!") - return None - - def get_plant_posture_data( - self, - ) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like - """ - Returns posture data necessary for plant feedback. - - TODO: Implement this. - """ - print(" WARNING: get_plant_posture_data() not implemented!") - return None - - class HardwareComponents: """ Hardware components packaged together into a class. @@ -467,7 +430,7 @@ def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> i ) return display_height_offset - def send_message(self, message: str, message_time: int = 1) -> None: + def send_message(self, messages: List[str], message_time: int = 1) -> None: """Clear the screen and display message Args: @@ -475,7 +438,11 @@ def send_message(self, message: str, message_time: int = 1) -> None: message_time: Time (seconds) to sleep for after displaying message. """ self.display.fill(0) - self.oled_display_text(message, 0, 0, 1) + display_height_offset = 0 + for text in messages: + display_height_offset = self.oled_display_text( + text, 0, 0 + display_height_offset, 1 + ) self.display.show() time.sleep(message_time) diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index fee192f..a095c4c 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -17,14 +17,15 @@ from models.pose_detection.frame_capturer import RaspCapturer NUM_FACES = 5 -QUIT = -4 +QUIT = -6 RESET = -5 BAD_STATUS_MESSAGES = { Status.NO_FACES.value: "No face detected please", Status.TOO_MANY_FACES.value: "Too many faces detected", Status.NO_MATCH.value: "Could not match face", + Status.ALREADY_REGISTERED.value: "Face already registered", } -QUIT_INSTRUCTIONS = "Right button to quit" +QUIT_INSTRUCTIONS = "Right: quit" Action = Callable[[HardwareComponents], int] @@ -43,9 +44,9 @@ def handle_authentication(hardware: HardwareComponents) -> int: while True: _log_and_send( hardware, - "Left button to login\n" - "Right button to register\n" - "Double press right button to reset data", + ["Left: login", + "Right: register", + "Double press right: reset data"] ) button = hardware.wait_for_button_press() @@ -81,8 +82,8 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int: def _attempt_login(hardware: HardwareComponents) -> int: capturer = RaspCapturer() - message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}" - _log_and_send(hardware, message, message_time=0) + messages = ["Left: take photo", f"{QUIT_INSTRUCTIONS}"] + _log_and_send(hardware, messages, message_time=0) button_pressed = hardware.wait_for_button_press() if button_pressed == LEFT_BUTTON: @@ -91,7 +92,7 @@ def _attempt_login(hardware: HardwareComponents) -> int: if button_pressed == RIGHT_BUTTON: return QUIT - _log_and_send(hardware, "Trying login...", message_time=0) + _log_and_send(hardware, ["Trying login..."], message_time=0) status = get_face_match(face) _handle_status_message(hardware, status) @@ -104,11 +105,11 @@ def _attempt_register(hardware: HardwareComponents) -> int: # Capture NUM_FACES faces faces: list[np.ndarray] = [] for i in range(NUM_FACES): - message = ( - f"Press left button to take photo {i + 1}/{NUM_FACES}\n" + messages = [ + f"Left: take photo {i + 1}/{NUM_FACES}", f"{QUIT_INSTRUCTIONS}" - ) - _log_and_send(hardware, message, message_time=0) + ] + _log_and_send(hardware, messages, message_time=0) button_pressed = hardware.wait_for_button_press() if button_pressed == RIGHT_BUTTON: @@ -119,13 +120,13 @@ def _attempt_register(hardware: HardwareComponents) -> int: faces.append(frame) # Try register faces - _log_and_send(hardware, "Registering...") + _log_and_send(hardware, ["Registering..."]) user_id = next_user_id() status = register_faces(user_id, faces) if status == Status.OK.value: create_user() - _log_and_send(hardware, "Registration successful!") + _log_and_send(hardware, ["Registration successful!"]) return user_id _handle_status_message(hardware, status) @@ -143,7 +144,7 @@ def _handle_status_message(hardware: HardwareComponents, status: int) -> None: def _log_and_send( - hardware: HardwareComponents, message: str, message_time: int = 1 + hardware: HardwareComponents, messages: list[str], message_time: int = 1 ) -> None: - logger.debug(message) - hardware.send_message(message, message_time=message_time) + logger.debug(messages) + hardware.send_message(messages, message_time=message_time) diff --git a/client/drivers/main.py b/client/drivers/main.py index 024497b..cc417e6 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -65,6 +65,11 @@ #: the plant will move down. #: FIXME: Fine-tune this value later. PLANT_PROPORTION_GOOD_THRESHOLD = 0.5 +""" +Threshold for I. Jensen Plant Mover 10000 feedback. If the proportion of "good" sitting posture is below this, +the plant will move down. +FIXME: Fine-tune this value later. +""" #: DEBUG Number of milliseconds between each loop iteration in do_everything(). DEBUG_DO_EVERYTHING_INTERVAL = 1000 @@ -169,8 +174,6 @@ def do_everything(auspost: ControlledData) -> None: Requires: ! auspost.is_failed() """ - print(" BEGIN do_everything()") - LOGIN_MESSAGE = "Logged in with user id: " + str(auspost.get_user_id()) LOGOUT_MESSAGE = "Logged out user id " + str(auspost.get_user_id()) @@ -197,7 +200,6 @@ def do_everything(auspost: ControlledData) -> None: hardware.display.show() while True: - # Loop invariant: ! auspost.is_failed() # Check for user logout if hardware.button0.was_pressed: hardware.display.fill(0) @@ -208,8 +210,7 @@ def do_everything(auspost: ControlledData) -> None: return update_display_screen(auspost) - # handle_posture_monitoring(auspost) - handle_posture_monitoring_new(auspost) + handle_posture_graph(auspost) handle_feedback(auspost) sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL) @@ -235,8 +236,6 @@ def update_display_screen(auspost: ControlledData) -> bool: Ensures: ! auspost.is_failed() """ - print(" BEGIN update_display_screen()") - while ( not auspost.get_posture_data().empty() ): # NOTE: This is much more robust than getting a fixed number of things out of the queue @@ -249,14 +248,25 @@ def update_display_screen(auspost: ControlledData) -> bool: ) hardware.display.show() - print(" END update_display_screen()") return True -def handle_posture_monitoring_new(auspost: ControlledData) -> bool: +def handle_posture_graph(auspost: ControlledData) -> bool: + """ + Get a snapshot of the user's posture data. + Use this information to update the data for the posture graph. - print(" handle_posture_monitoring_new()") + Args: + (auspost : ControlledData): Data encapsulating the current state of the program. + Returns: + (bool): True, always. If you get a False return value, then something has gone VERY wrong. + Requires: + ! auspost.is_failed() + Ensures: + ! auspost.is_failed() + TODO: Check this + """ now = datetime.now() if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: @@ -268,14 +278,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: period_end=now, ) - # Exit if not enough data - # if len(recent_posture_data) <= POSTURE_GRAPH_DATUM_WIDTH: + # Exit if no data if len(recent_posture_data) == 0: print(" Exiting handle_posture_monitoring_new() early: Not enough data") - # auspost.set_last_snapshot_time(datetime.now()) return True - # Exit if not in frame enough + # Exit if person not in frame enough average_prop_in_frame = sum( [posture.prop_in_frame for posture in recent_posture_data] ) / len(recent_posture_data) @@ -301,11 +309,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: # Calculate the interval length interval = total_time / POSTURE_GRAPH_DATUM_WIDTH - # Setup a sublist each representing 1 pixel on the graph + # Setup sublists, where each sublist is a portion of the overall data split_posture_lists: list[list[Posture]] split_posture_lists = [[] for _ in range(POSTURE_GRAPH_DATUM_WIDTH)] - # Sublists will be split by period_start + # What is in each sublist is determined by period_start + # We want an approximately equal amount of data in each sublist for posture in recent_posture_data: index = min( POSTURE_GRAPH_DATUM_WIDTH - 1, @@ -322,10 +331,6 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: average_prop_good = sum( [posture.prop_good for posture in posture_list] ) / len(posture_list) - # KILLME: - # print(f" {average_prop_good=}") - # auspost.accept_new_posture_data([average_prop_good] * DEBUG_MULTIPLIER_CONSTANT) # DEBUG: 2024-10-06_20-16 Gabe: Fixed the typing by wrapping into a singleton list - # print(f" Avg prop_good is {average_prop_good}") new_prop_good_data += [average_prop_good] * POSTURE_GRAPH_DATUM_WIDTH auspost.accept_new_posture_data(new_prop_good_data) @@ -333,33 +338,6 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: return True - -# SECTION: Feedback handling -def handle_posture_monitoring(auspost: ControlledData) -> bool: - """ - Take a snapshot monitoring the user, and update the given ControlledData if necessary. - - Args: - auspost: Data encapsulating the current state of the program. - - Returns: - (bool): True, always. If you get a False return value, then something has gone VERY wrong. - - Requires: - ! auspost.is_failed() - - Ensures: - ! auspost.is_failed() - """ - print(" handle_posture_monitoring()") - now = datetime.now() - if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: - auspost.accept_new_posture_data([]) - auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()]) - auspost.set_last_snapshot_time(now) - return True - - def handle_feedback(auspost: ControlledData) -> bool: """ Provide feedback to the user if necessary. diff --git a/client/models/face_recognition/recognition.py b/client/models/face_recognition/recognition.py index 043f8a3..b322fbb 100644 --- a/client/models/face_recognition/recognition.py +++ b/client/models/face_recognition/recognition.py @@ -9,6 +9,7 @@ class Status(Enum): + ALREADY_REGISTERED = -4 NO_FACES = -3 TOO_MANY_FACES = -2 NO_MATCH = -1 @@ -72,6 +73,14 @@ def register_faces(user_id: int, faces: list[np.ndarray]) -> int: if not all(matches): return Status.TOO_MANY_FACES.value + # Ensure user is not already registered + for _, other_user_embeddings in iter_face_embeddings(): + for embedding in face_embeddings: + matches = face_recognition.compare_faces(other_user_embeddings, embedding) + + if any(matches): + return Status.ALREADY_REGISTERED.value + register_face_embeddings(user_id, face_embeddings) return Status.OK.value