From 6e9877f07013815ff414ccedc465501e37d86d51 Mon Sep 17 00:00:00 2001 From: polypies73 <112163783+polypies73@users.noreply.github.com> Date: Thu, 10 Oct 2024 17:36:11 +1000 Subject: [PATCH 1/4] Update main.py Got rid of most of the useless debug comments and unused functions. --- client/drivers/main.py | 151 ++++++----------------------------------- 1 file changed, 21 insertions(+), 130 deletions(-) diff --git a/client/drivers/main.py b/client/drivers/main.py index d2da545..9945e30 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -76,10 +76,6 @@ FIXME: Fine-tune this value later. """ -# KILLME: -HANDLE_SNIFF_FEEDBACK_TIMEOUT = timedelta(milliseconds=20000) -""" Minimum delay between consecutive uses of the scent bottle-controlling servos. Used in handle_feedback(). """ - DEBUG_DO_EVERYTHING_INTERVAL = 1000 """ DEBUG Number of milliseconds between each loop iteration in do_everything(). """ @@ -143,10 +139,7 @@ def initialise_hardware() -> HardwareComponents: Returns: (HardwareComponents): Object consisting of all hardware components connected to the Raspberry Pi. - - TODO: Complete the function with all of the hardware peripherals (incrementally, as they get integrated). """ - print(" initialise_hardware()") # DEBUG return_me = HardwareComponents.make_fresh() # Clear button queues return_me.button0.was_pressed @@ -157,7 +150,6 @@ def initialise_hardware() -> HardwareComponents: # Write low to stop buzzer from mistakenly buzzing, if necessary GPIO.output(CUSHION_GPIO_PIN, GPIO.LOW) - print(" initialise_hardware() FINISHED") # DEBUG return return_me @@ -175,11 +167,7 @@ def do_everything(auspost: ControlledData) -> None: (auspost : ControlledData): Data encapsulating the current state of the program. Requires: ! auspost.is_failed() - - TODO: Actually implement this """ - print(" BEGIN do_everything()") - LOGIN_MESSAGE = "Logged in with user id: " + str(auspost.get_user_id()) LOGOUT_MESSAGE = "Logged out user id " + str(auspost.get_user_id()) @@ -206,7 +194,6 @@ def do_everything(auspost: ControlledData) -> None: hardware.display.show() while True: - # Loop invariant: ! auspost.is_failed() # Check for user logout if hardware.button0.was_pressed: hardware.display.fill(0) @@ -217,8 +204,7 @@ def do_everything(auspost: ControlledData) -> None: return update_display_screen(auspost) - # handle_posture_monitoring(auspost) - handle_posture_monitoring_new(auspost) + handle_posture_graph(auspost) handle_feedback(auspost) sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL) @@ -242,8 +228,6 @@ def update_display_screen(auspost: ControlledData) -> bool: Ensures: ! auspost.is_failed() """ - print(" BEGIN update_display_screen()") - while ( not auspost.get_posture_data().empty() ): # NOTE: This is much more robust than getting a fixed number of things out of the queue @@ -256,16 +240,25 @@ def update_display_screen(auspost: ControlledData) -> bool: ) hardware.display.show() - print(" END update_display_screen()") return True -def handle_posture_monitoring_new(auspost: ControlledData) -> bool: +def handle_posture_graph(auspost: ControlledData) -> bool: + """ + Get a snapshot of the user's posture data. + Use this information to update the data for the posture graph. - # DEBUG: - print(" handle_posture_monitoring_new()") - # :DEBUG + Args: + (auspost : ControlledData): Data encapsulating the current state of the program. + Returns: + (bool): True, always. If you get a False return value, then something has gone VERY wrong. + Requires: + ! auspost.is_failed() + Ensures: + ! auspost.is_failed() + TODO: Check this + """ now = datetime.now() if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: @@ -277,16 +270,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: period_end=now, ) - # DEBUG:: - # Exit if not enough data - # if len(recent_posture_data) <= POSTURE_GRAPH_DATUM_WIDTH: + # Exit if no data if len(recent_posture_data) == 0: - # ::DEBUG print(" Exiting handle_posture_monitoring_new() early: Not enough data") - # auspost.set_last_snapshot_time(datetime.now()) return True - # Exit if not in frame enough + # Exit if person not in frame enough average_prop_in_frame = sum( [posture.prop_in_frame for posture in recent_posture_data] ) / len(recent_posture_data) @@ -310,11 +299,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: # Calculate the interval length interval = total_time / POSTURE_GRAPH_DATUM_WIDTH - # Setup a sublist each representing 1 pixel on the graph + # Setup sublists, where each sublist is a portion of the overall data split_posture_lists: list[list[Posture]] split_posture_lists = [[] for _ in range(POSTURE_GRAPH_DATUM_WIDTH)] - # Sublists will be split by period_start + # What is in each sublist is determined by period_start + # We want an approximately equal amount of data in each sublist for posture in recent_posture_data: index = min( POSTURE_GRAPH_DATUM_WIDTH - 1, @@ -331,10 +321,6 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: average_prop_good = sum( [posture.prop_good for posture in posture_list] ) / len(posture_list) - # KILLME: - # print(f" {average_prop_good=}") - # auspost.accept_new_posture_data([average_prop_good] * DEBUG_MULTIPLIER_CONSTANT) # DEBUG: 2024-10-06_20-16 Gabe: Fixed the typing by wrapping into a singleton list - # print(f" Avg prop_good is {average_prop_good}") new_prop_good_data += [average_prop_good] * POSTURE_GRAPH_DATUM_WIDTH auspost.accept_new_posture_data(new_prop_good_data) @@ -342,68 +328,6 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool: return True - -# KILLME: -def handle_posture_monitoring(auspost: ControlledData) -> bool: - """ - Take a snapshot monitoring the user, and update the given ControlledData if necessary. - - Args: - (auspost : ControlledData): Data encapsulating the current state of the program. - Returns: - (bool): True, always. If you get a False return value, then something has gone VERY wrong. - Requires: - ! auspost.is_failed() - Ensures: - ! auspost.is_failed() - - TODO: Implement error handling - WARNING: UNTESTED! - """ - # DEBUG: - print(" handle_posture_monitoring()") - # :DEBUG - now = datetime.now() - if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: - # TODO: The ai_bros_get_posture_data() call might fail once it's implemented properly. - # If it does, we need to handle it properly. - auspost.accept_new_posture_data([]) - # DEBUG: - auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()]) - # :DEBUG - auspost.set_last_snapshot_time(now) - return True - - -def handle_feedback(auspost: ControlledData) -> bool: - """ - Provide feedback to the user if necessary. - - Args: - (auspost : ControlledData): Data encapsulating the current state of the program. - Returns: - (bool): True, always. If you get a False return value, then something has gone VERY wrong. - Requires: - ! auspost.is_failed() - Ensures: - ! auspost.is_failed() - """ - if ( - datetime.now() - > auspost.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT - ): - if not handle_cushion_feedback(auspost): - return False - if datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: - if not handle_plant_feedback(auspost): - return False - if datetime.now() > auspost.get_last_sniff_time() + HANDLE_SNIFF_FEEDBACK_TIMEOUT: - if not handle_sniff_feedback(auspost): - return False - - return True - - ## SECTION: Feedback handling @@ -420,13 +344,7 @@ def handle_cushion_feedback(auspost: ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - - TODO: Implement this method. Currently prints a debug statement and updates the time. """ - # DEBUG: - print(" handle_cushion_feedback()") - # :DEBUG - # Load posture records within the last HANDLE_CUSHION_FEEDBACK_TIMEOUT now = datetime.now() recent_posture_data = get_user_postures( @@ -442,6 +360,7 @@ def handle_cushion_feedback(auspost: ControlledData) -> bool: print(" Exiting handle_cushion_feedback() early: No data") auspost.set_last_cushion_time(datetime.now()) return True + # 2024-09-15_20-18 Gabe: TESTED. average_prop_in_frame = sum( [posture.prop_in_frame for posture in recent_posture_data] @@ -491,10 +410,6 @@ def handle_plant_feedback(auspost: ControlledData) -> bool: TODO: Implement this method. Currently prints a debug statement and updates the time. """ - # DEBUG: - print(" handle_plant_feedback()") - # :DEBUG - now = datetime.now() if now > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: @@ -537,30 +452,6 @@ def handle_plant_feedback(auspost: ControlledData) -> bool: return True - -def handle_sniff_feedback(auspost: ControlledData) -> bool: - """ - Dispense olfactory reward (if necessary), and update the timestamp of when olfactory feedback - was last given. - - Args: - (auspost : ControlledData): Data encapsulating the current state of the program. - Returns: - (bool): True, always. If you get a False return value, then something has gone VERY wrong. - Requires: - ! auspost.is_failed() - Ensures: - ! auspost.is_failed() - - TODO: Implement this method. Currently prints a debug statement and updates the time. - """ - # DEBUG: - print(" handle_sniff_feedback()") - # :DEBUG - auspost.set_last_sniff_time(datetime.now()) - return True - - def _reset_garden() -> None: """Reset data, faces and hardware.""" print(" Burning the garden to the ground...") From e4a0a4a77540521d994af1081da2ea1dbc1a23ef Mon Sep 17 00:00:00 2001 From: polypies73 <112163783+polypies73@users.noreply.github.com> Date: Thu, 10 Oct 2024 21:42:59 +1000 Subject: [PATCH 2/4] fixing deleted smth i shouldn't have, fixed --- client/drivers/main.py | 61 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/client/drivers/main.py b/client/drivers/main.py index 9945e30..ffeb0e9 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -139,6 +139,8 @@ def initialise_hardware() -> HardwareComponents: Returns: (HardwareComponents): Object consisting of all hardware components connected to the Raspberry Pi. + + TODO: Complete the function with all of the hardware peripherals (incrementally, as they get integrated). """ return_me = HardwareComponents.make_fresh() # Clear button queues @@ -328,6 +330,32 @@ def handle_posture_graph(auspost: ControlledData) -> bool: return True +def handle_feedback(auspost: ControlledData) -> bool: + """ + Provide feedback to the user if necessary. + + Args: + (auspost : ControlledData): Data encapsulating the current state of the program. + Returns: + (bool): True, always. If you get a False return value, then something has gone VERY wrong. + Requires: + ! auspost.is_failed() + Ensures: + ! auspost.is_failed() + """ + if ( + datetime.now() + > auspost.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT + ): + if not handle_cushion_feedback(auspost): + return False + if datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: + if not handle_plant_feedback(auspost): + return False + + return True + + ## SECTION: Feedback handling @@ -345,6 +373,10 @@ def handle_cushion_feedback(auspost: ControlledData) -> bool: Ensures: ! auspost.is_failed() """ + # DEBUG: + print(" handle_cushion_feedback()") + # :DEBUG + # Load posture records within the last HANDLE_CUSHION_FEEDBACK_TIMEOUT now = datetime.now() recent_posture_data = get_user_postures( @@ -360,7 +392,6 @@ def handle_cushion_feedback(auspost: ControlledData) -> bool: print(" Exiting handle_cushion_feedback() early: No data") auspost.set_last_cushion_time(datetime.now()) return True - # 2024-09-15_20-18 Gabe: TESTED. average_prop_in_frame = sum( [posture.prop_in_frame for posture in recent_posture_data] @@ -410,6 +441,10 @@ def handle_plant_feedback(auspost: ControlledData) -> bool: TODO: Implement this method. Currently prints a debug statement and updates the time. """ + # DEBUG: + print(" handle_plant_feedback()") + # :DEBUG + now = datetime.now() if now > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: @@ -452,6 +487,30 @@ def handle_plant_feedback(auspost: ControlledData) -> bool: return True + +def handle_sniff_feedback(auspost: ControlledData) -> bool: + """ + Dispense olfactory reward (if necessary), and update the timestamp of when olfactory feedback + was last given. + + Args: + (auspost : ControlledData): Data encapsulating the current state of the program. + Returns: + (bool): True, always. If you get a False return value, then something has gone VERY wrong. + Requires: + ! auspost.is_failed() + Ensures: + ! auspost.is_failed() + + TODO: Implement this method. Currently prints a debug statement and updates the time. + """ + # DEBUG: + print(" handle_sniff_feedback()") + # :DEBUG + auspost.set_last_sniff_time(datetime.now()) + return True + + def _reset_garden() -> None: """Reset data, faces and hardware.""" print(" Burning the garden to the ground...") From 58ea5f176440af0b9573b52fe044a9e9c920ec85 Mon Sep 17 00:00:00 2001 From: polypies73 <112163783+polypies73@users.noreply.github.com> Date: Thu, 10 Oct 2024 22:00:41 +1000 Subject: [PATCH 3/4] fixing text displays Hopefully fixing newline nonsense for displaying the login text stuff --- client/drivers/data_structures.py | 97 ++----------------------------- client/drivers/login_system.py | 30 +++++----- 2 files changed, 21 insertions(+), 106 deletions(-) diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index 7de3092..cbb020a 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -135,21 +135,6 @@ def make_failed(cls) -> "ControlledData": # SECTION: Getters/Setters - def DEBUG_get_next_posture_graph_value(self) -> int: - """ - Get next thing to put on the DEBUG graph. - - Returns: - (int): Next thing to put on the DEBUG graph. - - TODO: Remove this method - """ - return_me = self._DEBUG_current_graph_function( - self._DEBUG_current_graph_list_index - ) - self._DEBUG_current_graph_list_index += 1 - return return_me - def is_failed(self) -> bool: """ Returns: @@ -216,80 +201,6 @@ def set_last_plant_time(self, time: datetime) -> None: """ self._last_plant_time = time - def get_last_sniff_time(self) -> datetime: - """ - Returns: - (datetime): The last time that the user was provided olfactory feedback. - """ - return self._last_cushion_time - - def set_last_sniff_time(self, time: datetime) -> None: - """ - Args: - time : datetime - The last time that the user was provided olfactory feedback. - """ - self._last_sniff_time = time - - def accept_new_posture_data( - self, posture_data: List[float] - ) -> None: # TODO: Refine type signature - """ - Update the internal store of posture data for the OLED display. - - Args: - posture_data : List[float] - New posture data to accept and merge with the current state of this object. - - TODO: Implement me! - """ - # DEBUG: - print(" accept_new_posture_data()") - # :DEBUG - for datum in posture_data: - self._posture_data.put_nowait(datum) - - # SECTION: Posture data mapping - - def get_cushion_posture_data( - self, - ) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like - """ - Returns: - (CUSHION_POSTURE_DATA): Posture data necessary for cushion feedback. - TODO: Implement this. - """ - # DEBUG: - print(" WARNING: get_cushion_posture_data() not implemented!") - # :DEBUG - return None - - def get_plant_posture_data( - self, - ) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like - """ - Returns: - (PLANT_POSTURE_DATA) Posture data necessary for plant feedback. - TODO: Implement this. - """ - # DEBUG: - print(" WARNING: get_plant_posture_data() not implemented!") - # :DEBUG - return None - - def get_sniff_posture_data( - self, - ) -> "SNIFF_POSTURE_DATA": # TODO: Decide what this type looks like - """ - Returns: - (SNIFF_POSTURE_DATA): Posture data necessary for scent feedback. - TODO: Implement this. - """ - # DEBUG: - print(" WARNING: get_sniff_posture_data() not implemented!") - # :DEBUG - return None - ## SECTION: Hardware packaged together @@ -574,7 +485,7 @@ def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> i ) return display_height_offset - def send_message(self, message: str, message_time: int = 1) -> None: + def send_message(self, messages: List[str], message_time: int = 1) -> None: """Clear the screen and display message Args: @@ -582,7 +493,11 @@ def send_message(self, message: str, message_time: int = 1) -> None: message_time: Time (seconds) to sleep for after displaying message. """ self.display.fill(0) - self.oled_display_text(message, 0, 0, 1) + display_height_offset = 0 + for text in messages: + display_height_offset = self.oled_display_text( + text, 0, 0 + display_height_offset, 1 + ) self.display.show() time.sleep(message_time) diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index d5adbb0..0ce37fa 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -44,9 +44,9 @@ def handle_authentication(hardware: HardwareComponents) -> int: while True: _log_and_send( hardware, - "Left button to login\n" - "Right button to register\n" - "Double press right button to reset data", + ["Left button to login", + "Right button to register", + "Double press right button to reset data"] ) button = hardware.wait_for_button_press() @@ -82,8 +82,8 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int: def _attempt_login(hardware: HardwareComponents) -> int: capturer = RaspCapturer() - message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}" - _log_and_send(hardware, message, message_time=0) + messages = ["Press left button to take photo", f"{QUIT_INSTRUCTIONS}"] + _log_and_send(hardware, messages, message_time=0) button_pressed = hardware.wait_for_button_press() if button_pressed == LEFT_BUTTON: @@ -92,7 +92,7 @@ def _attempt_login(hardware: HardwareComponents) -> int: if button_pressed == RIGHT_BUTTON: return QUIT - _log_and_send(hardware, "Trying login...", message_time=0) + _log_and_send(hardware, ["Trying login..."], message_time=0) status = get_face_match(face) _handle_status_message(hardware, status) @@ -105,11 +105,11 @@ def _attempt_register(hardware: HardwareComponents) -> int: # Capture NUM_FACES faces faces: list[np.ndarray] = [] for i in range(NUM_FACES): - message = ( - f"Press left button to take photo {i + 1}/{NUM_FACES}\n" + messages = [ + f"Press left button to take photo {i + 1}/{NUM_FACES}", f"{QUIT_INSTRUCTIONS}" - ) - _log_and_send(hardware, message, message_time=0) + ] + _log_and_send(hardware, messages, message_time=0) button_pressed = hardware.wait_for_button_press() if button_pressed == RIGHT_BUTTON: @@ -120,13 +120,13 @@ def _attempt_register(hardware: HardwareComponents) -> int: faces.append(frame) # Try register faces - _log_and_send(hardware, "Registering...") + _log_and_send(hardware, ["Registering..."]) user_id = next_user_id() status = register_faces(user_id, faces) if status == Status.OK.value: create_user() - _log_and_send(hardware, "Registration successful!") + _log_and_send(hardware, ["Registration successful!"]) return user_id _handle_status_message(hardware, status) @@ -144,7 +144,7 @@ def _handle_status_message(hardware: HardwareComponents, status: int) -> None: def _log_and_send( - hardware: HardwareComponents, message: str, message_time: int = 1 + hardware: HardwareComponents, messages: list[str], message_time: int = 1 ) -> None: - logger.debug(message) - hardware.send_message(message, message_time=message_time) + logger.debug(messages) + hardware.send_message(messages, message_time=message_time) From 7c16db862e9e394f0f80c178d5ae1f77e244820b Mon Sep 17 00:00:00 2001 From: polypies73 <112163783+polypies73@users.noreply.github.com> Date: Thu, 10 Oct 2024 23:19:31 +1000 Subject: [PATCH 4/4] Display messages shorter display messages, that should actually be on new lines --- client/drivers/data_structures.py | 18 ++++++++++++++++++ client/drivers/login_system.py | 12 ++++++------ 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index cbb020a..12cca77 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -201,6 +201,24 @@ def set_last_plant_time(self, time: datetime) -> None: """ self._last_plant_time = time + def accept_new_posture_data( + self, posture_data: List[float] + ) -> None: # TODO: Refine type signature + """ + Update the internal store of posture data for the OLED display. + + Args: + posture_data : List[float] + New posture data to accept and merge with the current state of this object. + + TODO: Implement me! + """ + # DEBUG: + print(" accept_new_posture_data()") + # :DEBUG + for datum in posture_data: + self._posture_data.put_nowait(datum) + ## SECTION: Hardware packaged together diff --git a/client/drivers/login_system.py b/client/drivers/login_system.py index 0ce37fa..15ed2ce 100644 --- a/client/drivers/login_system.py +++ b/client/drivers/login_system.py @@ -25,7 +25,7 @@ Status.TOO_MANY_FACES.value: "Too many faces detected", Status.NO_MATCH.value: "Could not match face", } -QUIT_INSTRUCTIONS = "Right button to quit" +QUIT_INSTRUCTIONS = "Right: quit" Action = Callable[[HardwareComponents], int] @@ -44,9 +44,9 @@ def handle_authentication(hardware: HardwareComponents) -> int: while True: _log_and_send( hardware, - ["Left button to login", - "Right button to register", - "Double press right button to reset data"] + ["Left: login", + "Right: register", + "Double press right: reset data"] ) button = hardware.wait_for_button_press() @@ -82,7 +82,7 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int: def _attempt_login(hardware: HardwareComponents) -> int: capturer = RaspCapturer() - messages = ["Press left button to take photo", f"{QUIT_INSTRUCTIONS}"] + messages = ["Left: take photo", f"{QUIT_INSTRUCTIONS}"] _log_and_send(hardware, messages, message_time=0) button_pressed = hardware.wait_for_button_press() @@ -106,7 +106,7 @@ def _attempt_register(hardware: HardwareComponents) -> int: faces: list[np.ndarray] = [] for i in range(NUM_FACES): messages = [ - f"Press left button to take photo {i + 1}/{NUM_FACES}", + f"Left: take photo {i + 1}/{NUM_FACES}", f"{QUIT_INSTRUCTIONS}" ] _log_and_send(hardware, messages, message_time=0)