From 3df524dd9ea5a6ef1b252e94e90e82c6d888dc5b Mon Sep 17 00:00:00 2001 From: MitchellJC <81349046+MitchellJC@users.noreply.github.com> Date: Fri, 4 Oct 2024 11:11:02 +1000 Subject: [PATCH 1/2] ref: deleted ai_bros.py --- client/drivers/ai_bros.py | 80 --------------------------------------- 1 file changed, 80 deletions(-) delete mode 100644 client/drivers/ai_bros.py diff --git a/client/drivers/ai_bros.py b/client/drivers/ai_bros.py deleted file mode 100644 index 303c162..0000000 --- a/client/drivers/ai_bros.py +++ /dev/null @@ -1,80 +0,0 @@ -## SECTION: Imports - -from typing import Tuple -from PiicoDev_Switch import PiicoDev_Switch -from PiicoDev_SSD1306 import * -import threading -from datetime import datetime -from data.routines import * -# from models.face_rec.routines import get_user_from_face # TODO: Fix this import later... - -#from PiicoDev_Unified import sleep_ms - -from data_structures import ControlledData, HardwareComponents, Picture, Face - - -def ai_bros_face_recogniser(underlying_picture : int) -> Face: # TODO: Refine type signature - """ - Recognise a face, powered by AI. - - Args: - underlying_picture : UNDERLYING_PICTURE - The picture to pass to the face recogniser. This data passing may be handled differently - in the final version. - Returns: - (Face): Failed, matched or unmatched Face - TODO: Convert this into an external API call. Currently returns debug data. - """ - # DEBUG: - print(" ai_bros_face_recogniser()") - DEBUG_failed = False - DEBUG_unmatched = None - DEBUG_user_id = 1 - - # try: - # returned_user = get_user_from_face(underlying_picture) - # except NotImplementedError: - # returned_user = -1 - #returned_user = DEBUG_unmatched - - # :DEBUG - if DEBUG_failed: - return Face.make_failed() - # if returned_user is None: # DEBUG - # return Face.make_unmatched() # DEBUG - return Face.make_matched(DEBUG_user_id) - # return Face.make_matched(returned_user.id_) # DEBUG - -def ai_bros_posture_score(underlying_picture : "UNDERLYING_PICTURE") -> int: # TODO: Refine type signature - """ - Args: - underlying_picture : UNDERLYING_PICTURE - The picture of the person's posture - Returns: - int: score representing how good the posture currently is??? - TODO: Convert this into an external API call. Currently returns debug data. - NOTE: This will eventually be a database lookup. We're running the AI posture peeker - asynchronously to the controller code. - FIXME: This documentation is terrible - """ - return 1 - -def ai_bros_get_posture_data(last_snapshot_time : datetime) -> "POSTURE_DATA": # TODO: Refine type signature - """ - API call to get posture data from the SQLite database. - Gets all data from after last_snapshot_time until the current time. - - Args: - last_snapshot_time : datetime - The last time we read the posture data. - - Returns: - (POSTURE_DATA): Posture data returned from the API call. - - TODO: Actually implement this method. Currently prints a debug method and returns an empty list. - """ - # DEBUG: - print(" ai_bros_get_posture_data()") - DEBUG_return_value = [] - # :DEBUG - return DEBUG_return_value From 0241e66543b4fbb7e1ed5cf689670d917a05ed2d Mon Sep 17 00:00:00 2001 From: MitchellJC <81349046+MitchellJC@users.noreply.github.com> Date: Fri, 4 Oct 2024 11:43:08 +1000 Subject: [PATCH 2/2] ref: removed ai_bros references from driver main --- client/drivers/main.py | 158 ++++++++++++++++++++++++----------------- 1 file changed, 93 insertions(+), 65 deletions(-) diff --git a/client/drivers/main.py b/client/drivers/main.py index 96a0b71..d3a0e85 100644 --- a/client/drivers/main.py +++ b/client/drivers/main.py @@ -20,11 +20,9 @@ from datetime import datetime, timedelta from data_structures import ControlledData, HardwareComponents, Picture, Face -from ai_bros import * from data.routines import * - ## SECTION: Global constants CUSHION_GPIO_PIN = 8 @@ -45,33 +43,35 @@ LOGOUT_SUCCESS_DELAY = 3000 """ Number of milliseconds between the user successfully logging out and returning to main(). """ -GET_POSTURE_DATA_TIMEOUT = timedelta(milliseconds = 2000) # DEBUG: Change this value up to ~60000 later. +GET_POSTURE_DATA_TIMEOUT = timedelta( + milliseconds=2000 +) # DEBUG: Change this value up to ~60000 later. """ Minimum delay between reading posture data from the SQLite database, in do_everything(). """ PROPORTION_IN_FRAME_THRESHOLD = 0.3 """ Proportion of the time the user must be in frame for any feedback to be given. FIXME: Fine-tune this value later. """ -HANDLE_CUSHION_FEEDBACK_TIMEOUT = timedelta(milliseconds = 5000) +HANDLE_CUSHION_FEEDBACK_TIMEOUT = timedelta(milliseconds=5000) """ Minimum delay between consecutive uses of the vibration motor. Used in handle_feedback(). """ -CUSHION_ACTIVE_INTERVAL = timedelta(milliseconds = 1000) +CUSHION_ACTIVE_INTERVAL = timedelta(milliseconds=1000) """ Length of time for which the vibration motor should vibrate. Used in handle_cushion_feedback(). """ CUSHION_PROPORTION_GOOD_THRESHOLD = 0.5 """ Threshold for vibration cushion feedback. If the proportion of "good" sitting posture is below this, the cushion will vibrate. """ -HANDLE_PLANT_FEEDBACK_TIMEOUT = timedelta(milliseconds = 10000) +HANDLE_PLANT_FEEDBACK_TIMEOUT = timedelta(milliseconds=10000) """ Minimum delay between consecutive uses of the plant-controlling servos. Used in handle_feedback(). """ -HANDLE_SNIFF_FEEDBACK_TIMEOUT = timedelta(milliseconds = 20000) +HANDLE_SNIFF_FEEDBACK_TIMEOUT = timedelta(milliseconds=20000) """ Minimum delay between consecutive uses of the scent bottle-controlling servos. Used in handle_feedback(). """ DEBUG_DO_EVERYTHING_INTERVAL = 1000 """ DEBUG Number of milliseconds between each loop iteration in do_everything(). """ - ## SECTION: main() + def main(): """ Entry point for the control program. @@ -80,7 +80,7 @@ def main(): print(" main()") # :DEBUG - global hardware + global hardware hardware = initialise_hardware() init_database() @@ -95,9 +95,9 @@ def main(): do_everything(main_data) - ## SECTION: Hardware initialisation + # 2024-09-01_15-29 Gabe: TESTED. for buttons and OLED display. def initialise_hardware() -> HardwareComponents: """ @@ -112,10 +112,10 @@ def initialise_hardware() -> HardwareComponents: TODO: Complete the function with all of the hardware peripherals (incrementally, as they get integrated). """ - print(" initialise_hardware()") # DEBUG + print(" initialise_hardware()") # DEBUG return_me = HardwareComponents.make_fresh() # Clear button queues - return_me.button0.was_pressed + return_me.button0.was_pressed return_me.button1.was_pressed # Set up GPIO pins GPIO.setmode(GPIO.BCM) # Same pin numbering convention as the PiicoDev header @@ -123,20 +123,20 @@ def initialise_hardware() -> HardwareComponents: # Write low to stop buzzer from mistakenly buzzing, if necessary GPIO.output(CUSHION_GPIO_PIN, GPIO.LOW) - print(" initialise_hardware() FINISHED") # DEBUG + print(" initialise_hardware() FINISHED") # DEBUG return return_me - ## SECTION: Login handling + # 2024-09-01_15-52 Gabe: TESTED. def wait_for_login_attempt() -> bool: """ Waits until the user attempts to log in. Returns: - (bool): True when the user attempts to log in. + (bool): True when the user attempts to log in. """ print(" BEGIN wait_for_login_attempt()") @@ -153,10 +153,11 @@ def wait_for_login_attempt() -> bool: # Clear the display hardware.display.fill(0) hardware.display.show() - print(" END wait_for_login_attempt()") # DEBUG + print(" END wait_for_login_attempt()") # DEBUG return True sleep_ms(WAIT_FOR_LOGIN_POLLING_INTERVAL) + # 2024-09-01 17:06 Gabe: TESTED., assuming ai_bros_face_recogniser() does what it should do. def attempt_login() -> ControlledData: """ @@ -185,32 +186,35 @@ def attempt_login() -> ControlledData: hardware.display.show() picture = take_picture() if picture.failed: - print(' Picture Failed') # DEBUG + print(" Picture Failed") # DEBUG hardware.display.fill(0) hardware.oled_display_text(PICTURE_FAILED_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) continue - face = ai_bros_face_recogniser(picture.underlying_picture) # TODO: This should be an external API call. + face = Face.make_matched(1) if face.failed: - print(" AI has failed us") # DEBUG + print(" AI has failed us") # DEBUG hardware.display.fill(0) hardware.oled_display_text(AI_FAILED_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(LOGIN_TAKE_PICTURE_INTERVAL) continue if face.matched: - print(" Mega W for AI") # DEBUG + print(" Mega W for AI") # DEBUG return ControlledData.make_empty(face.user_id) elif ask_create_new_user(): - return ControlledData.make_empty(create_new_user(picture.underlying_picture)) + return ControlledData.make_empty( + create_new_user(picture.underlying_picture) + ) # Tell the user the login failed - print(" attempt_login(): Totally failed lol") # DEBUG + print(" attempt_login(): Totally failed lol") # DEBUG hardware.display.fill(0) hardware.oled_display_text(LOGIN_TOTALLY_FAILED_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(FAIL_LOGIN_DELAY) - return ControlledData.make_failed() + return ControlledData.make_failed() + def take_picture() -> Picture: """ @@ -224,6 +228,7 @@ def take_picture() -> Picture: # :DEBUG return DEBUG_return_value + # 2024-09-01 17:06 Gabe: TESTED. def ask_create_new_user() -> bool: """ @@ -237,7 +242,12 @@ def ask_create_new_user() -> bool: """ print(" BEGIN ask_create_new_user()") - CREATE_NEW_USER_MESSAGES = ["No face matched.", "Create new user?", "button0: no", "button1: yes"] + CREATE_NEW_USER_MESSAGES = [ + "No face matched.", + "Create new user?", + "button0: no", + "button1: yes", + ] # Display to screen hardware.display.fill(0) hardware.oled_display_texts(CREATE_NEW_USER_MESSAGES, 0, 0, 1) @@ -251,17 +261,18 @@ def ask_create_new_user() -> bool: # Clear the display hardware.display.fill(0) hardware.display.show() - print(" END ask_create_new_user(): do NOT create new user") # DEBUG + print(" END ask_create_new_user(): do NOT create new user") # DEBUG return False if hardware.button1.was_pressed: # Clear the display hardware.display.fill(0) hardware.display.show() - print(" END ask_create_new_user(): DO create new user") # DEBUG + print(" END ask_create_new_user(): DO create new user") # DEBUG return True sleep_ms(ASK_CREATE_NEW_USER_POLLING_INTERVAL) -def create_new_user(underlying_picture : int) -> int: + +def create_new_user(underlying_picture: int) -> int: """ Create a new user based on the given picture, and return their user id. @@ -284,13 +295,13 @@ def create_new_user(underlying_picture : int) -> int: # return new_user_id # DEBUG - ## SECTION: Control for the logged-in user + # 2024-09-02 07-03 Gabe: Currently working the following here: # top-level control flow # interaction with buttons and display -def do_everything(auspost : ControlledData) -> None: +def do_everything(auspost: ControlledData) -> None: """ Main control flow once a user is logged in. @@ -298,7 +309,7 @@ def do_everything(auspost : ControlledData) -> None: (auspost : ControlledData): Data encapsulating the current state of the program. Requires: ! auspost.is_failed() - + TODO: Actually implement this """ print(" BEGIN do_everything()") @@ -314,7 +325,7 @@ def do_everything(auspost : ControlledData) -> None: hardware.oled_display_text(LOGIN_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(LOGIN_SUCCESS_DELAY) - + # Clear button queues hardware.button0.was_pressed hardware.button1.was_pressed @@ -323,7 +334,7 @@ def do_everything(auspost : ControlledData) -> None: hardware.display.show() while True: - # Loop invariant: ! auspost.is_failed() + # Loop invariant: ! auspost.is_failed() # Check for user logout if hardware.button0.was_pressed: hardware.display.fill(0) @@ -331,8 +342,8 @@ def do_everything(auspost : ControlledData) -> None: hardware.display.show() sleep_ms(LOGOUT_SUCCESS_DELAY) print(" END do_everything()") - return - + return + # Probably should run individual threads for each of these # TODO: Move the threading to a more reasonable location. main() is probably best. # posture_monitoring_thread = threading.Thread(handle_posture_monitoring, args=(auspost)) @@ -344,15 +355,16 @@ def do_everything(auspost : ControlledData) -> None: sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL) + # 2024-09-13 11-32 Gabe: TESTED. -def update_display_screen(auspost : ControlledData) -> bool: +def update_display_screen(auspost: ControlledData) -> bool: """ Update the display screen with whatever needs to be on there. We will display: As per HardwareComponents.get_control_messages(), and Current-session posture graph - Args: + Args: (auspost : ControlledData): Data encapsulating the current state of the program. Returns: @@ -365,16 +377,21 @@ def update_display_screen(auspost : ControlledData) -> bool: print(" BEGIN update_display_screen()") hardware.display.fill(0) - hardware.oled_display_texts(hardware.get_control_messages(auspost.get_user_id()), 0, 0, 1) + hardware.oled_display_texts( + hardware.get_control_messages(auspost.get_user_id()), 0, 0, 1 + ) if not auspost.get_posture_data().empty(): - hardware.display.updateGraph2D(hardware.posture_graph, auspost.get_posture_data().get_nowait()) + hardware.display.updateGraph2D( + hardware.posture_graph, auspost.get_posture_data().get_nowait() + ) # Render hardware.display.show() print(" END update_display_screen()") return True -def handle_posture_monitoring(auspost : ControlledData) -> bool: + +def handle_posture_monitoring(auspost: ControlledData) -> bool: """ Take a snapshot monitoring the user, and update the given ControlledData if necessary. @@ -386,7 +403,7 @@ def handle_posture_monitoring(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement error handling WARNING: UNTESTED! """ @@ -394,20 +411,21 @@ def handle_posture_monitoring(auspost : ControlledData) -> bool: print(" handle_posture_monitoring()") # :DEBUG now = datetime.now() - if (now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT): + if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: # TODO: The ai_bros_get_posture_data() call might fail once it's implemented properly. # If it does, we need to handle it properly. - auspost.accept_new_posture_data(ai_bros_get_posture_data(auspost.get_last_snapshot_time())) + auspost.accept_new_posture_data([]) # DEBUG: auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()]) # :DEBUG auspost.set_last_snapshot_time(now) return True -def handle_feedback(auspost : ControlledData) -> bool: + +def handle_feedback(auspost: ControlledData) -> bool: """ Provide feedback to the user if necessary. - + Args: (auspost : ControlledData): Data encapsulating the current state of the program. Returns: @@ -417,27 +435,30 @@ def handle_feedback(auspost : ControlledData) -> bool: Ensures: ! auspost.is_failed() """ - if (datetime.now() > auspost.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT): + if ( + datetime.now() + > auspost.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT + ): if not handle_cushion_feedback(auspost): return False - if (datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT): + if datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: if not handle_plant_feedback(auspost): return False - if (datetime.now() > auspost.get_last_sniff_time() + HANDLE_SNIFF_FEEDBACK_TIMEOUT): + if datetime.now() > auspost.get_last_sniff_time() + HANDLE_SNIFF_FEEDBACK_TIMEOUT: if not handle_sniff_feedback(auspost): return False - - return True + return True ## SECTION: Feedback handling + # 2024-09-15_20-18 Gabe: TESTED. -def handle_cushion_feedback(auspost : ControlledData) -> bool: +def handle_cushion_feedback(auspost: ControlledData) -> bool: """ Vibrate cushion (if necessary), and update the timestamp of when cushion feedback was last given. - + Args: (auspost : ControlledData): Data encapsulating the current state of the program. Returns: @@ -446,7 +467,7 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement this method. Currently prints a debug statement and updates the time. """ # DEBUG: @@ -456,12 +477,12 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: # Load posture records within the last HANDLE_CUSHION_FEEDBACK_TIMEOUT now = datetime.now() recent_posture_data = get_user_postures( - auspost.get_user_id(), - num = -1, - period_start = now - HANDLE_CUSHION_FEEDBACK_TIMEOUT, - period_end = now + auspost.get_user_id(), + num=-1, + period_start=now - HANDLE_CUSHION_FEEDBACK_TIMEOUT, + period_end=now, ) - + # Conditions for exiting early # 2024-09-15_19-47 Gabe: TESTED. if len(recent_posture_data) == 0: @@ -469,13 +490,19 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: auspost.set_last_cushion_time(datetime.now()) return True # 2024-09-15_20-18 Gabe: TESTED. - average_prop_in_frame = sum([posture.prop_in_frame for posture in recent_posture_data]) / len(recent_posture_data) + average_prop_in_frame = sum( + [posture.prop_in_frame for posture in recent_posture_data] + ) / len(recent_posture_data) if average_prop_in_frame < PROPORTION_IN_FRAME_THRESHOLD: - print(" Exiting handle_cushion_feedback() early: Not in frame for a high enough proportion of time.") + print( + " Exiting handle_cushion_feedback() early: Not in frame for a high enough proportion of time." + ) auspost.set_last_cushion_time(datetime.now()) return True # 2024-09-15_20-18 Gabe: TESTED. - average_prop_good = sum([posture.prop_good for posture in recent_posture_data]) / len(recent_posture_data) + average_prop_good = sum( + [posture.prop_good for posture in recent_posture_data] + ) / len(recent_posture_data) if average_prop_good >= CUSHION_PROPORTION_GOOD_THRESHOLD: print(" Exiting handle_cushion_feedback() early: You sat well :)") auspost.set_last_cushion_time(datetime.now()) @@ -494,7 +521,8 @@ def handle_cushion_feedback(auspost : ControlledData) -> bool: auspost.set_last_cushion_time(datetime.now()) return True -def handle_plant_feedback(auspost : ControlledData) -> bool: + +def handle_plant_feedback(auspost: ControlledData) -> bool: """ Set the plant height according to short-term current session data, and update the timestamp of when plant feedback was last given. @@ -507,7 +535,7 @@ def handle_plant_feedback(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement this method. Currently prints a debug statement and updates the time. """ # DEBUG: @@ -516,7 +544,8 @@ def handle_plant_feedback(auspost : ControlledData) -> bool: auspost.set_last_plant_time(datetime.now()) return True -def handle_sniff_feedback(auspost : ControlledData) -> bool: + +def handle_sniff_feedback(auspost: ControlledData) -> bool: """ Dispense olfactory reward (if necessary), and update the timestamp of when olfactory feedback was last given. @@ -529,7 +558,7 @@ def handle_sniff_feedback(auspost : ControlledData) -> bool: ! auspost.is_failed() Ensures: ! auspost.is_failed() - + TODO: Implement this method. Currently prints a debug statement and updates the time. """ # DEBUG: @@ -539,7 +568,6 @@ def handle_sniff_feedback(auspost : ControlledData) -> bool: return True - ## LAUNCH if __name__ == "__main__":