diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index ab0f116..6aafa07 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -99,18 +99,6 @@ def make_failed(cls) -> "ControlledData": # SECTION: Getters/Setters - def DEBUG_get_next_posture_graph_value(self) -> int: - """ - Returns next thing to put on the DEBUG graph. - - TODO: Remove this method - """ - return_me = self._DEBUG_current_graph_function( - self._DEBUG_current_graph_list_index - ) - self._DEBUG_current_graph_list_index += 1 - return return_me - def is_failed(self) -> bool: """ Returns True iff this ControlledData is failed. @@ -183,31 +171,6 @@ def accept_new_posture_data( for datum in posture_data: self._posture_data.put_nowait(datum) - # SECTION: Posture data mapping - - def get_cushion_posture_data( - self, - ) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like - """ - Returns posture data necessary for cushion feedback. - - TODO: Implement this. - """ - print(" WARNING: get_cushion_posture_data() not implemented!") - return None - - def get_plant_posture_data( - self, - ) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like - """ - Returns posture data necessary for plant feedback. - - TODO: Implement this. - """ - print(" WARNING: get_plant_posture_data() not implemented!") - return None - - class HardwareComponents: """ Hardware components packaged together into a class. @@ -467,7 +430,7 @@ def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> i ) return display_height_offset - def send_message(self, message: str, message_time: int = 1) -> None: + def send_message(self, messages: List[str], message_time: int = 1) -> None: """Clear the screen and display message Args: @@ -475,7 +438,11 @@ def send_message(self, message: str, message_time: int = 1) -> None: message_time: Time (seconds) to sleep for after displaying message. """ self.display.fill(0) - self.oled_display_text(message, 0, 0, 1) + display_height_offset = 0 + for text in messages: + display_height_offset = self.oled_display_text( + text, 0, 0 + display_height_offset, 1 + ) self.display.show() time.sleep(message_time) diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index b076ce7..a095c4c 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -25,7 +25,7 @@ Status.NO_MATCH.value: "Could not match face", Status.ALREADY_REGISTERED.value: "Face already registered", } -QUIT_INSTRUCTIONS = "Right button to quit" +QUIT_INSTRUCTIONS = "Right: quit" Action = Callable[[HardwareComponents], int] @@ -44,9 +44,9 @@ def handle_authentication(hardware: HardwareComponents) -> int: while True: _log_and_send( hardware, - "Left button to login\n" - "Right button to register\n" - "Double press right button to reset data", + ["Left: login", + "Right: register", + "Double press right: reset data"] ) button = hardware.wait_for_button_press() @@ -82,8 +82,8 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int: def _attempt_login(hardware: HardwareComponents) -> int: capturer = RaspCapturer() - message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}" - _log_and_send(hardware, message, message_time=0) + messages = ["Left: take photo", f"{QUIT_INSTRUCTIONS}"] + _log_and_send(hardware, messages, message_time=0) button_pressed = hardware.wait_for_button_press() if button_pressed == LEFT_BUTTON: @@ -92,7 +92,7 @@ def _attempt_login(hardware: HardwareComponents) -> int: if button_pressed == RIGHT_BUTTON: return QUIT - _log_and_send(hardware, "Trying login...", message_time=0) + _log_and_send(hardware, ["Trying login..."], message_time=0) status = get_face_match(face) _handle_status_message(hardware, status) @@ -105,11 +105,11 @@ def _attempt_register(hardware: HardwareComponents) -> int: # Capture NUM_FACES faces faces: list[np.ndarray] = [] for i in range(NUM_FACES): - message = ( - f"Press left button to take photo {i + 1}/{NUM_FACES}\n" + messages = [ + f"Left: take photo {i + 1}/{NUM_FACES}", f"{QUIT_INSTRUCTIONS}" - ) - _log_and_send(hardware, message, message_time=0) + ] + _log_and_send(hardware, messages, message_time=0) button_pressed = hardware.wait_for_button_press() if button_pressed == RIGHT_BUTTON: @@ -120,13 +120,13 @@ def _attempt_register(hardware: HardwareComponents) -> int: faces.append(frame) # Try register faces - _log_and_send(hardware, "Registering...") + _log_and_send(hardware, ["Registering..."]) user_id = next_user_id() status = register_faces(user_id, faces) if status == Status.OK.value: create_user() - _log_and_send(hardware, "Registration successful!") + _log_and_send(hardware, ["Registration successful!"]) return user_id _handle_status_message(hardware, status) @@ -144,7 +144,7 @@ def _handle_status_message(hardware: HardwareComponents, status: int) -> None: def _log_and_send( - hardware: HardwareComponents, message: str, message_time: int = 1 + hardware: HardwareComponents, messages: list[str], message_time: int = 1 ) -> None: - logger.debug(message) - hardware.send_message(message, message_time=message_time) + logger.debug(messages) + hardware.send_message(messages, message_time=message_time) diff --git a/client/drivers/main.py b/client/drivers/main.py index 024497b..cc417e6 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -65,6 +65,11 @@ #: the plant will move down. #: FIXME: Fine-tune this value later. PLANT_PROPORTION_GOOD_THRESHOLD = 0.5 +""" +Threshold for I. Jensen Plant Mover 10000 feedback. If the proportion of "good" sitting posture is below this, +the plant will move down. +FIXME: Fine-tune this value later. +""" #: DEBUG Number of milliseconds between each loop iteration in do_everything(). DEBUG_DO_EVERYTHING_INTERVAL = 1000 @@ -169,8 +174,6 @@ def do_everything(auspost: ControlledData) -> None: Requires: ! auspost.is_failed() """ - print(" BEGIN do_everything()") - LOGIN_MESSAGE = "Logged in with user id: " + str(auspost.get_user_id()) LOGOUT_MESSAGE = "Logged out user id " + str(auspost.get_user_id()) @@ -197,7 +200,6 @@ def do_everything(auspost: ControlledData) -> None: hardware.display.show() while True: - # Loop invariant: ! auspost.is_failed() # Check for user logout if hardware.button0.was_pressed: hardware.display.fill(0) @@ -208,8 +210,7 @@ def do_everything(auspost: ControlledData) -> None: return update_display_screen(auspost) - # handle_posture_monitoring(auspost) - handle_posture_monitoring_new(auspost) + handle_posture_graph(auspost) handle_feedback(auspost) sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL) @@ -235,8 +236,6 @@ def update_display_screen(auspost: ControlledData) -> bool: Ensures: ! auspost.is_failed() """ - print(" BEGIN update_display_screen()") - while ( not auspost.get_posture_data().empty() ): # NOTE: This is much more robust than getting a fixed number of things out of the queue @@ -249,14 +248,25 @@ def update_display_screen(auspost: ControlledData) -> bool: ) hardware.display.show() - print(" END update_display_screen()") return True -def handle_posture_monitoring_new(auspost: ControlledData) -> bool: +def handle_posture_graph(auspost: ControlledData) -> bool: + """ + Get a snapshot of the user's posture data. + Use this information to update the data for the posture graph. - print(" handle_posture_monitoring_new()") + Args: + (auspost : ControlledData): Data encapsulating the current state of the program. + Returns: + (bool): True, always. If you get a False return value, then something has gone VERY wrong. + Requires: + ! auspost.is_failed() + Ensures: + ! auspost.is_failed() + TODO: Check this + """ now = datetime.now() if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: @@ -268,14 +278,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: period_end=now, ) - # Exit if not enough data - # if len(recent_posture_data) <= POSTURE_GRAPH_DATUM_WIDTH: + # Exit if no data if len(recent_posture_data) == 0: print(" Exiting handle_posture_monitoring_new() early: Not enough data") - # auspost.set_last_snapshot_time(datetime.now()) return True - # Exit if not in frame enough + # Exit if person not in frame enough average_prop_in_frame = sum( [posture.prop_in_frame for posture in recent_posture_data] ) / len(recent_posture_data) @@ -301,11 +309,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: # Calculate the interval length interval = total_time / POSTURE_GRAPH_DATUM_WIDTH - # Setup a sublist each representing 1 pixel on the graph + # Setup sublists, where each sublist is a portion of the overall data split_posture_lists: list[list[Posture]] split_posture_lists = [[] for _ in range(POSTURE_GRAPH_DATUM_WIDTH)] - # Sublists will be split by period_start + # What is in each sublist is determined by period_start + # We want an approximately equal amount of data in each sublist for posture in recent_posture_data: index = min( POSTURE_GRAPH_DATUM_WIDTH - 1, @@ -322,10 +331,6 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: average_prop_good = sum( [posture.prop_good for posture in posture_list] ) / len(posture_list) - # KILLME: - # print(f" {average_prop_good=}") - # auspost.accept_new_posture_data([average_prop_good] * DEBUG_MULTIPLIER_CONSTANT) # DEBUG: 2024-10-06_20-16 Gabe: Fixed the typing by wrapping into a singleton list - # print(f" Avg prop_good is {average_prop_good}") new_prop_good_data += [average_prop_good] * POSTURE_GRAPH_DATUM_WIDTH auspost.accept_new_posture_data(new_prop_good_data) @@ -333,33 +338,6 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: return True - -# SECTION: Feedback handling -def handle_posture_monitoring(auspost: ControlledData) -> bool: - """ - Take a snapshot monitoring the user, and update the given ControlledData if necessary. - - Args: - auspost: Data encapsulating the current state of the program. - - Returns: - (bool): True, always. If you get a False return value, then something has gone VERY wrong. - - Requires: - ! auspost.is_failed() - - Ensures: - ! auspost.is_failed() - """ - print(" handle_posture_monitoring()") - now = datetime.now() - if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: - auspost.accept_new_posture_data([]) - auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()]) - auspost.set_last_snapshot_time(now) - return True - - def handle_feedback(auspost: ControlledData) -> bool: """ Provide feedback to the user if necessary.