diff --git a/client/drivers/camera_overlord.py b/client/drivers/camera_overlord.py index f55ea51..1e74c30 100644 --- a/client/drivers/camera_overlord.py +++ b/client/drivers/camera_overlord.py @@ -40,13 +40,13 @@ def handle_quit(signo, frame): f = open("/tmp/snapshot.jpg", "x") f.close() except FileExistsError: - print("Snapshot already exists") + logger.debug("Snapshot already exists") try: f = open("/tmp/big_brother.jpg", "x") f.close() except FileExistsError: - print("Big Brother already exists") + logger.debug("Big Brother already exists") while True: for _ in range(2 * 3): diff --git a/client/drivers/data_structures.py b/client/drivers/data_structures.py index 6aafa07..383e493 100644 --- a/client/drivers/data_structures.py +++ b/client/drivers/data_structures.py @@ -5,6 +5,7 @@ Gabriel Field (47484306), Mitchell Clark """ +import logging import time from datetime import datetime from math import pi, sin @@ -22,10 +23,12 @@ RIGHT_BUTTON = 1 DOUBLE_RIGHT_BUTTON = 2 +logger = logging.getLogger(__name__) + class ControlledData: """ - Data for passing around in client/drivers/main.do_everything(). + Data for passing around in client/drivers/main.run_user_session(). There should only ever be one object of this class at a time. @@ -80,7 +83,9 @@ def make_empty(cls, user_id: int) -> "ControlledData": return_me._last_snapshot_time = datetime.now() return_me._last_cushion_time = datetime.now() return_me._last_plant_time = datetime.now() - print(" Made a new empty ControlledData() with user_id", return_me._user_id) + logger.debug( + " Made a new empty ControlledData() with user_id %d", return_me._user_id + ) return return_me @classmethod @@ -156,21 +161,19 @@ def set_last_plant_time(self, time: datetime) -> None: """ self._last_plant_time = time - def accept_new_posture_data( - self, posture_data: List[float] - ) -> None: # TODO: Refine type signature + def accept_new_posture_data(self, posture_data: List[float]) -> None: """ Update the internal store of posture data for the OLED display. Args: posture_data: new posture data to accept and merge with the current state of this object. - TODO: Implement me! """ - print(" accept_new_posture_data()") + logger.debug(" accept_new_posture_data()") for datum in posture_data: self._posture_data.put_nowait(datum) + class HardwareComponents: """ Hardware components packaged together into a class. @@ -231,7 +234,7 @@ class HardwareComponents: _PLANT_MOVER_PERIOD: float = 1000 * 60 / 55 _BASE_FULL_SPEED = 0.1 _FULL_SPEED_UPWARDS = _BASE_FULL_SPEED * (4 / 7) * (8 / 9) * 2 - _FULL_SPEED_DOWNWARDS = (-1) * _BASE_FULL_SPEED * (4 / 5) * 2 + _FULL_SPEED_DOWNWARDS = (-1) * _BASE_FULL_SPEED * (6 / 10) * 2 # SECTION: Constructors @@ -277,7 +280,7 @@ def get_control_messages(self, user_id: int) -> List[str]: Returns: The messages to display to the user during the main application loop. """ - return ["b0: logout", "id: " + str(user_id)] + return ["Left: logout", "ID: " + str(user_id)] def initialise_posture_graph(self, user_id: int) -> None: """ @@ -326,6 +329,9 @@ def unwind_plant(self) -> None: """ self.plant_mover.speed = self._FULL_SPEED_UPWARDS time.sleep(16 * self._PLANT_MOVER_PERIOD * self._PLANT_GEAR_RATIO / 1000) + self.plant_height = ( + self._PLANT_SHAFT_TURNS - self._PLANT_SHAFT_SAFETY_BUFFER_TURNS + ) self.plant_mover.speed = 0 def wind_plant_safe(self) -> None: @@ -333,15 +339,16 @@ def wind_plant_safe(self) -> None: Wind the plant down to its minimum (safe) height. Will also reset the `plant_height` to `0`. """ - self.plant_mover.speed = self._FULL_SPEED_DOWNWARDS - time.sleep( - (self._PLANT_SHAFT_TURNS - self._PLANT_SHAFT_SAFETY_BUFFER_TURNS) - * self._PLANT_MOVER_PERIOD - * self._PLANT_GEAR_RATIO - / 1000 - ) - self.plant_mover.speed = 0 - self.plant_height = 0 + self.set_plant_height(0) + # self.plant_mover.speed = self._FULL_SPEED_DOWNWARDS + # time.sleep( + # (self._PLANT_SHAFT_TURNS - self._PLANT_SHAFT_SAFETY_BUFFER_TURNS) + # * self._PLANT_MOVER_PERIOD + # * self._PLANT_GEAR_RATIO + # / 1000 + # ) + # self.plant_mover.speed = 0 + # self.plant_height = 0 def set_plant_height(self, new_height: int) -> None: """ @@ -351,7 +358,7 @@ def set_plant_height(self, new_height: int) -> None: new_height: height to which to drive the I. Jensen Plant Mover 10000 """ self.plant_mover.speed = 0 - print(f" set_plant_height: {self.plant_height=}, {new_height=}") + logger.debug(f" set_plant_height: {self.plant_height=}, {new_height=}") distance = new_height - self.plant_height distance = distance if distance > 0 else (-1) * distance if new_height == self.plant_height: @@ -361,13 +368,13 @@ def set_plant_height(self, new_height: int) -> None: new_height > self._PLANT_SHAFT_TURNS - self._PLANT_SHAFT_SAFETY_BUFFER_TURNS - 1 ): - print( + logger.debug( " Plant mover not schmovin': can't get that high mate, that's just unsafe" ) self.plant_mover.speed = 0 return if new_height < 0: - print( + logger.debug( " Plant mover not schmovin': can't get that low mate, that's just dirty" ) self.plant_mover.speed = 0 diff --git a/client/drivers/main.py b/client/drivers/pi_overlord.py similarity index 65% rename from client/drivers/main.py rename to client/drivers/pi_overlord.py index cc417e6..9f0f87d 100644 --- a/client/drivers/main.py +++ b/client/drivers/pi_overlord.py @@ -28,51 +28,36 @@ #: Pin to which the vibration motor is attached. This is D8 on the PiicoDev header. CUSHION_GPIO_PIN = 8 -#: Number of milliseconds between each time the button is polled during wait_for_login_attempt(). -WAIT_FOR_LOGIN_POLLING_INTERVAL = 100 -#: Number of milliseconds between pictures taken for login attempts. -LOGIN_TAKE_PICTURE_INTERVAL = 1000 -#: Number of milliseconds between starting to attempt_login() and taking the first picture. -START_LOGIN_ATTEMPTS_DELAY = 3000 -#: Number of milliseconds between each time the button is polled during ask_create_new_user(). -ASK_CREATE_NEW_USER_POLLING_INTERVAL = 100 -#: Number of milliseconds between telling the user that login has completely failed and returning from attempt_login(). -FAIL_LOGIN_DELAY = 3000 #: Number of milliseconds between telling the user that login has succeeded and beginning real functionality. -LOGIN_SUCCESS_DELAY = 3000 +LOGIN_SUCCESS_DELAY = 1000 +#: Number of milliseconds between the user successfully logging out and returning to main(). +LOGOUT_SUCCESS_DELAY = 1000 -#: Width (in pixels) of each individual entry on the posture graph. (One pixel is hard to read.) -POSTURE_GRAPH_DATUM_WIDTH = 5 +#: Proportion of the time the user must be in frame for any feedback to be given. +PROPORTION_IN_FRAME_THRESHOLD = 0.4 -#: Number of milliseconds between the user successfully logging out and returning to main(). -LOGOUT_SUCCESS_DELAY = 3000 -#: Minimum delay between reading posture data from the SQLite database, in do_everything(). +#: Minimum delay between reading posture data from the SQLite database, in run_user_session(). GET_POSTURE_DATA_TIMEOUT = timedelta(milliseconds=10000) -#: Proportion of the time the user must be in frame for any feedback to be given. FIXME: Fine-tune this value later. -PROPORTION_IN_FRAME_THRESHOLD = 0.3 +#: Width (in pixels) of each individual entry on the posture graph. (One pixel is hard to read.) +POSTURE_GRAPH_DATUM_WIDTH = 5 +#: The number of data points to split the total data into, collected each time we read from the SQLite database. +NUM_DATA_POINTS_PER_TIMEOUT = 3 #: Minimum delay between consecutive uses of the vibration motor. Used in handle_feedback(). -HANDLE_CUSHION_FEEDBACK_TIMEOUT = timedelta(milliseconds=10000) # DEBUG +HANDLE_CUSHION_FEEDBACK_TIMEOUT = timedelta(milliseconds=30000) #: Length of time for which the vibration motor should vibrate. Used in handle_cushion_feedback(). -CUSHION_ACTIVE_INTERVAL = timedelta(milliseconds=1000) +CUSHION_ACTIVE_INTERVAL = timedelta(milliseconds=3000) #: Threshold for vibration cushion feedback. If the proportion of "good" sitting posture is below this, the cushion will vibrate. -#: FIXME: Fine-tune this value later. CUSHION_PROPORTION_GOOD_THRESHOLD = 0.5 #: Minimum delay between consecutive uses of the plant-controlling servos. Used in handle_feedback(). -HANDLE_PLANT_FEEDBACK_TIMEOUT = timedelta(milliseconds=2000) # DEBUG: used to be 10000 +HANDLE_PLANT_FEEDBACK_TIMEOUT = timedelta(milliseconds=15000) #: Threshold for I. Jensen Plant Mover 10000 feedback. If the proportion of "good" sitting posture is below this, #: the plant will move down. -#: FIXME: Fine-tune this value later. PLANT_PROPORTION_GOOD_THRESHOLD = 0.5 -""" -Threshold for I. Jensen Plant Mover 10000 feedback. If the proportion of "good" sitting posture is below this, -the plant will move down. -FIXME: Fine-tune this value later. -""" -#: DEBUG Number of milliseconds between each loop iteration in do_everything(). -DEBUG_DO_EVERYTHING_INTERVAL = 1000 +#: Number of milliseconds between each loop iteration in run_user_session(). +USER_SESSION_INTERVAL = 100 logger = logging.getLogger(__name__) @@ -123,8 +108,7 @@ def main(): logger.debug("Login successful") - # Run user session - do_everything(user) + run_user_session(user) # Let the posture tracking process if the user has logged out if not args.no_posture_model: @@ -146,7 +130,7 @@ def initialise_hardware() -> HardwareComponents: Object consisting of all hardware components connected to the Raspberry Pi. """ - print(" initialise_hardware()") + logger.debug(" initialise_hardware()") return_me = HardwareComponents.make_fresh() # Clear button queues return_me.button0.was_pressed @@ -157,28 +141,28 @@ def initialise_hardware() -> HardwareComponents: # Write low to stop buzzer from mistakenly buzzing, if necessary GPIO.output(CUSHION_GPIO_PIN, GPIO.LOW) - print(" initialise_hardware() FINISHED") + logger.debug(" initialise_hardware() FINISHED") return return_me # SECTION: Control for the logged-in user -def do_everything(auspost: ControlledData) -> None: +def run_user_session(user: ControlledData) -> None: """ Main control flow once a user is logged in. Args: - auspost: data encapsulating the current state of the program. + user: data encapsulating the current state of the program. Requires: - ! auspost.is_failed() + ! user.is_failed() """ - LOGIN_MESSAGE = "Logged in with user id: " + str(auspost.get_user_id()) - LOGOUT_MESSAGE = "Logged out user id " + str(auspost.get_user_id()) + LOGIN_MESSAGE = "Logged in with user id: " + str(user.get_user_id()) + LOGOUT_MESSAGE = "Logged out user id " + str(user.get_user_id()) # Initialise posture graph for the current session - hardware.initialise_posture_graph(auspost.get_user_id()) + hardware.initialise_posture_graph(user.get_user_id()) # Wind plant down all the way hardware.wind_plant_safe() @@ -195,7 +179,7 @@ def do_everything(auspost: ControlledData) -> None: # Set up initial display hardware.display.fill(0) hardware.oled_display_texts( - hardware.get_control_messages(auspost.get_user_id()), 0, 0, 1 + hardware.get_control_messages(user.get_user_id()), 0, 0, 1 ) hardware.display.show() @@ -206,17 +190,18 @@ def do_everything(auspost: ControlledData) -> None: hardware.oled_display_text(LOGOUT_MESSAGE, 0, 0, 1) hardware.display.show() sleep_ms(LOGOUT_SUCCESS_DELAY) - print(" END do_everything()") + logger.debug(" END run_user_session()") return - update_display_screen(auspost) - handle_posture_graph(auspost) - handle_feedback(auspost) + # Run core functionality + update_display_screen(user) + handle_posture_graph(user) + handle_feedback(user) - sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL) + sleep_ms(USER_SESSION_INTERVAL) -def update_display_screen(auspost: ControlledData) -> bool: +def update_display_screen(user: ControlledData) -> bool: """ Update the display screen with whatever needs to be on there. @@ -225,54 +210,54 @@ def update_display_screen(auspost: ControlledData) -> bool: Current-session posture graph Args: - auspost: data encapsulating the current state of the program. + user: data encapsulating the current state of the program. Returns: True, always. If you get a False return value, then something has gone VERY wrong. Requires: - ! auspost.is_failed() + ! user.is_failed() Ensures: - ! auspost.is_failed() + ! user.is_failed() """ while ( - not auspost.get_posture_data().empty() + not user.get_posture_data().empty() ): # NOTE: This is much more robust than getting a fixed number of things out of the queue hardware.display.fill(0) + # Display user messages hardware.oled_display_texts( - hardware.get_control_messages(auspost.get_user_id()), 0, 0, 1 + hardware.get_control_messages(user.get_user_id()), 0, 0, 1 ) + # Display posture graph data hardware.display.updateGraph2D( - hardware.posture_graph, auspost.get_posture_data().get_nowait() + hardware.posture_graph, user.get_posture_data().get_nowait() ) hardware.display.show() return True -def handle_posture_graph(auspost: ControlledData) -> bool: +def handle_posture_graph(user: ControlledData) -> bool: """ Get a snapshot of the user's posture data. Use this information to update the data for the posture graph. Args: - (auspost : ControlledData): Data encapsulating the current state of the program. + (user : ControlledData): Data encapsulating the current state of the program. Returns: (bool): True, always. If you get a False return value, then something has gone VERY wrong. Requires: - ! auspost.is_failed() + ! user.is_failed() Ensures: - ! auspost.is_failed() - - TODO: Check this + ! user.is_failed() """ now = datetime.now() - if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: + if now > user.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT: # Get the most recent posture data for the user recent_posture_data = get_user_postures( - auspost.get_user_id(), + user.get_user_id(), num=-1, period_start=now - GET_POSTURE_DATA_TIMEOUT, period_end=now, @@ -280,7 +265,9 @@ def handle_posture_graph(auspost: ControlledData) -> bool: # Exit if no data if len(recent_posture_data) == 0: - print(" Exiting handle_posture_monitoring_new() early: Not enough data") + logger.debug( + " Exiting handle_posture_monitoring_new() early: Not enough data" + ) return True # Exit if person not in frame enough @@ -288,10 +275,10 @@ def handle_posture_graph(auspost: ControlledData) -> bool: [posture.prop_in_frame for posture in recent_posture_data] ) / len(recent_posture_data) if average_prop_in_frame < PROPORTION_IN_FRAME_THRESHOLD: - print( + logger.debug( " Exiting handle_posturing_monitoring_new() early: Not in frame for a high enough proportion of time." ) - auspost.set_last_snapshot_time(datetime.now()) + user.set_last_snapshot_time(datetime.now()) return True # Sort the list by period_start @@ -301,208 +288,209 @@ def handle_posture_graph(auspost: ControlledData) -> bool: # Calculate total time span start_time = recent_posture_data[0].period_start - end_time = recent_posture_data[-1].period_end # DEBUG: Used to be period_start - total_time = ( - end_time - start_time - ) # BUG: This calculation sometimes results in `0`, which gets divided by later... + end_time = recent_posture_data[-1].period_end + total_time = end_time - start_time # Calculate the interval length - interval = total_time / POSTURE_GRAPH_DATUM_WIDTH + interval = total_time / NUM_DATA_POINTS_PER_TIMEOUT - # Setup sublists, where each sublist is a portion of the overall data + # Setup sublists, where each sublist is a portion of the overall data split_posture_lists: list[list[Posture]] - split_posture_lists = [[] for _ in range(POSTURE_GRAPH_DATUM_WIDTH)] + split_posture_lists = [[] for _ in range(NUM_DATA_POINTS_PER_TIMEOUT)] # What is in each sublist is determined by period_start # We want an approximately equal amount of data in each sublist for posture in recent_posture_data: index = min( - POSTURE_GRAPH_DATUM_WIDTH - 1, + NUM_DATA_POINTS_PER_TIMEOUT - 1, int((posture.period_start - start_time) // interval), ) split_posture_lists[index].append(posture) new_prop_good_data = [] - # Enqueue the average good posture for the graph to use + # Enqueue the average good posture for each data point for the graph to use for posture_list in split_posture_lists: if len(posture_list) == 0: continue - print(f" {posture_list=}") + logger.debug(f" {posture_list=}") average_prop_good = sum( [posture.prop_good for posture in posture_list] ) / len(posture_list) new_prop_good_data += [average_prop_good] * POSTURE_GRAPH_DATUM_WIDTH - auspost.accept_new_posture_data(new_prop_good_data) + user.accept_new_posture_data(new_prop_good_data) - auspost.set_last_snapshot_time(now) + user.set_last_snapshot_time(now) return True -def handle_feedback(auspost: ControlledData) -> bool: + +def handle_feedback(user: ControlledData) -> bool: """ Provide feedback to the user if necessary. Args: - auspost: Data encapsulating the current state of the program. + user: Data encapsulating the current state of the program. Returns: (bool): True, always. If you get a False return value, then something has gone VERY wrong. Requires: - ! auspost.is_failed() + ! user.is_failed() Ensures: - ! auspost.is_failed() + ! user.is_failed() """ - if ( - datetime.now() - > auspost.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT - ): - if not handle_cushion_feedback(auspost): + if datetime.now() > user.get_last_cushion_time() + HANDLE_CUSHION_FEEDBACK_TIMEOUT: + if not handle_cushion_feedback(user): return False - if datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: - if not handle_plant_feedback(auspost): + if datetime.now() > user.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: + if not handle_plant_feedback(user): return False return True -def handle_cushion_feedback(auspost: ControlledData) -> bool: +def handle_cushion_feedback(user: ControlledData) -> bool: """ Vibrate cushion (if necessary), and update the timestamp of when cushion feedback was last given. Args: - auspost: Data encapsulating the current state of the program. + user: Data encapsulating the current state of the program. Returns: True, always. If you get a False return value, then something has gone VERY wrong. Requires: - ! auspost.is_failed() + ! user.is_failed() Ensures: - ! auspost.is_failed() + ! user.is_failed() """ - print(" handle_cushion_feedback()") + logger.debug(" handle_cushion_feedback()") # Load posture records within the last HANDLE_CUSHION_FEEDBACK_TIMEOUT now = datetime.now() recent_posture_data = get_user_postures( - auspost.get_user_id(), + user.get_user_id(), num=-1, period_start=now - HANDLE_CUSHION_FEEDBACK_TIMEOUT, period_end=now, ) - # Conditions for exiting early + # Exit if no data if len(recent_posture_data) == 0: - print(" Exiting handle_cushion_feedback() early: No data") - auspost.set_last_cushion_time(datetime.now()) + logger.debug(" Exiting handle_cushion_feedback() early: No data") + user.set_last_cushion_time(datetime.now()) return True + # Exit if person not in frame enough average_prop_in_frame = sum( [posture.prop_in_frame for posture in recent_posture_data] ) / len(recent_posture_data) if average_prop_in_frame < PROPORTION_IN_FRAME_THRESHOLD: - print( + logger.debug( " Exiting handle_cushion_feedback() early: Not in frame for a high enough proportion of time." ) - auspost.set_last_cushion_time(datetime.now()) + user.set_last_cushion_time(datetime.now()) return True + + # Get average proportion of good posture average_prop_good = sum( [posture.prop_good for posture in recent_posture_data] ) / len(recent_posture_data) if average_prop_good >= CUSHION_PROPORTION_GOOD_THRESHOLD: - print(" Exiting handle_cushion_feedback() early: You sat well :)") - auspost.set_last_cushion_time(datetime.now()) + logger.debug(" Exiting handle_cushion_feedback() early: You sat well :)") + user.set_last_cushion_time(datetime.now()) return True + # If posture not good enough, turn buzzer on buzzer_start_time = datetime.now() GPIO.output(CUSHION_GPIO_PIN, GPIO.HIGH) - print(" buzzer on") + logger.debug(" buzzer on") while datetime.now() < buzzer_start_time + CUSHION_ACTIVE_INTERVAL: - # Can add extra code here if necessary. This WILL halt execution of this thread. sleep_ms(100) + # Turn buzzer off GPIO.output(CUSHION_GPIO_PIN, GPIO.LOW) - print(" buzzer off") + logger.debug(" buzzer off") - auspost.set_last_cushion_time(datetime.now()) + user.set_last_cushion_time(datetime.now()) return True -def handle_plant_feedback(auspost: ControlledData) -> bool: +def handle_plant_feedback(user: ControlledData) -> bool: """ Set the plant height according to short-term current session data, and update the timestamp of when plant feedback was last given. Args: - auspost: Data encapsulating the current state of the program. + user: Data encapsulating the current state of the program. Returns: (bool): True, always. If you get a False return value, then something has gone VERY wrong. Requires: - ! auspost.is_failed() + ! user.is_failed() Ensures: - ! auspost.is_failed() + ! user.is_failed() """ - print(" handle_plant_feedback()") + logger.debug(" handle_plant_feedback()") now = datetime.now() - if now > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: + if now > user.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT: # Get the most recent posture data for the user recent_posture_data = get_user_postures( - auspost.get_user_id(), + user.get_user_id(), num=-1, period_start=now - GET_POSTURE_DATA_TIMEOUT, period_end=now, ) - # Conditions for exiting early + # Exit if no data if len(recent_posture_data) == 0: - print(" Exiting handle_plant_feedback() early: No data") - auspost.set_last_plant_time(datetime.now()) + logger.debug(" Exiting handle_plant_feedback() early: No data") + user.set_last_plant_time(datetime.now()) return True + # Exit if person not in frame enough average_prop_in_frame = sum( [posture.prop_in_frame for posture in recent_posture_data] ) / len(recent_posture_data) if average_prop_in_frame < PROPORTION_IN_FRAME_THRESHOLD: - print( + logger.debug( " Exiting handle_plant_feedback() early: Not in frame for a high enough proportion of time." ) - auspost.set_last_plant_time(datetime.now()) + user.set_last_plant_time(datetime.now()) return True - # Calculate average proportion + # Calculate average proportion of good posture average_prop_good = sum( [posture.prop_good for posture in recent_posture_data] ) / len(recent_posture_data) - # Judge. + # Raise plant 1 'level' if posture is good, otherwise lower it 1. if average_prop_good >= PLANT_PROPORTION_GOOD_THRESHOLD: hardware.set_plant_height(hardware.plant_height + 1) else: hardware.set_plant_height(hardware.plant_height - 1) - auspost.set_last_plant_time(datetime.now()) + user.set_last_plant_time(datetime.now()) return True def _reset_garden() -> None: """Reset data, faces and hardware.""" - print(" Burning the garden to the ground...") + logger.debug(" Burning the garden to the ground...") global hardware destroy_database() - print("\t initialising database anew...") + logger.debug("\t initialising database anew...") init_database() - print("\t resetting face embeddings...") + logger.debug("\t resetting face embeddings...") reset_registered_face_embeddings() - print("\t initialising hardware...") + logger.debug("\t initialising hardware...") hardware = initialise_hardware() - print("\t Like a phoenix, the Sitting Desktop Garden rises anew") + logger.debug("\t Like a phoenix, the Sitting Desktop Garden rises anew") # LAUNCH diff --git a/client/overlord_overlord.py b/client/overlord_overlord.py index 89f4634..9fca930 100644 --- a/client/overlord_overlord.py +++ b/client/overlord_overlord.py @@ -18,7 +18,7 @@ def spawn_camera_overlord(): def spawn_pi_overlord(no_posture_model): - cmd = [PYTHON_DEFAULT, "client/drivers/main.py"] + cmd = [PYTHON_DEFAULT, "client/drivers/pi_overlord.py"] if no_posture_model: cmd.append("--no-posture-model") subprocess.run(cmd) diff --git a/client/proof_of_concept/__init__.py b/client/proof_of_concept/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/client/proof_of_concept/buttons.py b/client/proof_of_concept/buttons.py deleted file mode 100644 index c31dbab..0000000 --- a/client/proof_of_concept/buttons.py +++ /dev/null @@ -1,112 +0,0 @@ -""" -File: - sitting-desktop-garden/client/proof_of_concept/buttons.py - -Purpose: - Hardware test for the PiicoDev buttons. - -Author: - Gabriel Field (47484306) - -Source: - Largely based on https://core-electronics.com.au/guides/piicodev-button-getting-started-guide/#B62QRRH -""" - -## SECTION: Imports - -from PiicoDev_Switch import PiicoDev_Switch # Switch may be used for other types of PiicoDev Switch devices -from PiicoDev_Unified import sleep_ms - - - -## SECTION: main() - -def main(): - print("==== TEST: Chungus on one button ====") - button = PiicoDev_Switch(double_press_duration = 400) # Double-presses are those that happen with interval <= 400ms - basic_press_test(button) - led_test(button) - double_press_test(button) - - print(" Set button IDs to 0000 and 1110.") - print(" Do this by setting switches down for '0' and up for '1'.") - print(" You have 10 seconds...") - sleep_ms(1000 * 10) - print(" Assuming the button IDs have been set.") - - print("==== TEST: Chungi on two buttons ====") - button0000 = PiicoDev_Switch(id = [0, 0, 0, 0]) - button1110 = PiicoDev_Switch(id = [1, 1, 1, 0]) - two_chungi(button0000, button1110) - - print("==== TEST: COMPLETE! ====") - - - -## TEST: One-button - -def basic_press_test(button: PiicoDev_Switch): - print(" BEGIN basic_press_test()") - sleep_ms(2000) - cycle = 0 - press_count = 0 - while cycle < 10: - print("\tCycle: " + str(cycle)) - press_count += button.press_count - print("\t\tWas pressed: " + str(button.was_pressed)) - print("\t\tIs pressed: " + str(button.is_pressed)) - print("\t\tTotal press count: " + str(press_count)) - cycle += 1 - sleep_ms(1000) - print(" END basic_press_test()") - -def led_test(button): - print(" BEGIN led_test()") - sleep_ms(2000) - cycle = 0 - button.led = False - thinking_led = False - while cycle < 10: - print("\tCycle: " + str(cycle)) - button.led = not button.led - thinking_led = False - print("\tButton should be on: " + str(thinking_led)) - cycle += 1 - sleep_ms(1000) - print(" END led_test()") - -def double_press_test(button): - print(" BEGIN double_press_test()") - sleep_ms(2000) - cycle = 0 - while cycle < 10: - print("\tCycle: " + str(cycle)) - print("\tWas double-pressed: " + str(button.was_double_pressed)) - cycle += 1 - sleep_ms(1000) - print(" END double_press_test()") - - - -## TEST: Two buttons - -def two_chungi(button0000, button1110): - print(" BEGIN two_chungi()") - sleep_ms(2000) - cycle = 0 - button0000.press_count # Clear the count - button1110.press_count # Clear the count - while cycle < 10: - print("\tCycle: " + str(cycle)) - print("\tButton with id 0000 pressed this many times since last time: " + str(button0000.press_count)) - print("\tButton with id 1110 pressed this many times since last time: " + str(button1110.press_count)) - cycle += 1 - sleep_ms(1000) - print(" END two_chungi()") - - - -## LAUNCH - -if __name__ == "__main__": - main() diff --git a/client/proof_of_concept/camera.py b/client/proof_of_concept/camera.py deleted file mode 100644 index ece4377..0000000 --- a/client/proof_of_concept/camera.py +++ /dev/null @@ -1,125 +0,0 @@ -""" -File: - sitting-desktop-garden/client/proof_of_concept/buttons.py - -Purpose: - Hardware test for the PiicoDev buttons. - -Author: - Gabriel Field (47484306) - -Source: - Largely based on https://projects.raspberrypi.org/en/projects/getting-started-with-picamera/0 -""" - -## SECTION: Imports - -from picamera2 import Picamera2, Preview -from PiicoDev_Unified import sleep_ms -from time import time - - - -## SECTION: main() - -def main(): - print("==== TEST: Preview ====") - picam2 = Picamera2() - test_preview(picam2) - - print("==== TEST: Pics ====") - test_pics(picam2) - - print("==== TEST: Vids ====") - test_vids(picam2) - - print("==== TEST: COMPLETE! ====") - - - -## TEST: Preview - -def test_preview(picam2): - print(" BEGIN test_preview()") - sleep_ms(2000) - # Preview for 10 seconds - print("\tStarting preview for 10 seconds... Requires monitor!!") - picam2.start_preview(Preview.QTGL) - picam2.start() - sleep_ms(10000) - picam2.close() - print(" END test_preview()") - - - -## TEST: Pics - -def test_pics(picam2): - print(" BEGIN test_pics()") - sleep_ms(2000) - # Take a single picture - print("\tTaking a picture to ./camera_dump/picture-00.jpg") - start = time() - picam2.start_and_capture_file("./camera_dump/picture-00.jpg") - picam2.close() - end = time() - print("\t\t^done. Took " + str(end - start) + " seconds") - sleep_ms(5000) - # Take many pictures - print("\tTaking 10 pictures to ./camera_dump-pictures-?.jpg for ? = 0 .. 9, with delay 0.1 seconds") - start = time() - picam2.start_and_capture_files("./camera_dump/pictures-{:d}.jpg", num_files = 10, delay = 0.1) - picam2.close() - end = time() - print("\t\t^done. Took " + str(end - start) + " seconds") - sleep_ms(5000) - print(" END test_pics()") - - - -## TEST: Vids - -def test_vids(picam2): - print(" BEGIN test_vids()") - sleep_ms(2000) - # Take a single video for 10 seconds - print("\tTaking a 10-second video to ./camera_dump/video-00.jpg") - start = time() - picam2.start_and_record_video("./camera_dump/video-00.jpg", duration = 10, show_preview = False) - picam2.close() - end = time() - print("\t\t^done. Took " + str(end - start) + " seconds") - sleep_ms(5000) - # Taking many short videos - print("\tTaking ten 1-second videos to ./camera_dump/shorts-?.jpg for ? = 0 .. 9") - start = time() - for i in range(10): - picam2.start_and_record_video("./camera_dump/shorts-" + str(i) + ".jpg", duration = 1, show_preview = False) - picam2.close() - end = time() - print("\t\t^done. Took " + str(end - start) + " seconds") - # Taking many tiny videos - start = time() - for i in range(100): - print("\tTaking 100 0.1-second videos to ./camera_dump/tiny-%02f" % i, duration = 0.1, show_preview = False) - picam2.close() - end = time() - print("\t\t^done. Took " + str(end - start) + " seconds") - sleep_ms(5000) - # Take many short videos without .close() - print("\t Taking ten 1-second videos without .close() to ./camera_dump/open-?.jpg for ? = 0 .. 9") - start = time() - for i in range(10): - picam2.start_and_record_video("./camera_dump/open-" + str(i) + ".jpg", duration = 1, show_preview = False) - picam2.close() - end = time() - print("\t\t^done. Took " + str(end - start) + " seconds") - sleep_ms(5000) - print(" END test_vids()") - - - -## LAUNCH - -if __name__ == "__main__": - main() diff --git a/client/proof_of_concept/cushion.py b/client/proof_of_concept/cushion.py deleted file mode 100644 index 628ebf0..0000000 --- a/client/proof_of_concept/cushion.py +++ /dev/null @@ -1,5 +0,0 @@ -""" -I have no idea how this is gonna work. We'll need to do our own PWM or something ig. -Will only do this after I have the hardware to play with properly. -~Gabe -""" diff --git a/client/proof_of_concept/display.py b/client/proof_of_concept/display.py deleted file mode 100644 index bbf1c22..0000000 --- a/client/proof_of_concept/display.py +++ /dev/null @@ -1,264 +0,0 @@ -""" -File: - sitting-desktop-garden/client/proof_of_concept/display.py - -Purpose: - Hardware test for the PiicoDev display. - -Author: - Gabriel Field (47484306) - -Source: - Largely based on https://core-electronics.com.au/guides/raspberry-pi/piicodev-oled-ssd1306-raspberry-pi-guide/ - -NOTE: - Ensure that I2C is enabled on the RPi first! -""" - -## SECTION: Imports - -from PiicoDev_SSD1306 import * -from PiicoDev_Unified import sleep_ms # cross-platform compatible sleep function -from math import sin, cos, pi -import random - - - -## SECTION: How the .graph2D constructor works - -""" -PiicoDev_SSD1306 - .graph2D - .__init__( - self, - originX = 0, originY = HEIGHT-1, # SSD coordinates of BOTTOM-LEFT CORNER of the graph. Units: pixels - width = WIDTH, height = HEIGHT, # Width and height that the graph TAKES UP ON THE SSD. Units: pixels. - minValue = 0, maxValue = 255, # Minimum and maximum vertical value that can be dispalyed on the graph. Unitless. - # Note: horizontal values for graph are *literally* horizontal pixels. - c = 1, # Colour in which to draw graph. - bars = False # Use True for a bar chart. We'll probably never use this. - ) -""" - - - -## SECTION: main() - -def main(): - display = create_PiicoDev_SSD1306() - - print("Screen width is " + str(WIDTH)) - print("Screen height is " + str(HEIGHT)) - sleep_ms(2000) - - print("==== TEST: Basic functionality ====") - draw_lines(display) - draw_rectangles(display) - - print("==== TEST: Displaying text and images ====") - draw_text(display) - draw_images(display) - - print("==== TEST: Graph ====") - draw_graph(display) - - print("==== TEST: COMPLETE! ====") - - - -## TEST: Basic functionality - -def draw_lines(display): - print(" BEGIN draw_lines()") - sleep_ms(2000) - # Draw a line from (0, 0) to (WIDTH, HEIGHT) - print("\tDrawing line from (0, 0) to (WIDTH, HEIGHT)") - start_x = 0 - start_y = 0 - end_x = WIDTH - end_y = HEIGHT - colour = 1 # COLOURS: `0` for black or `1` for white - display.line(start_x, start_y, end_x, end_y, colour) # Logically draw this line - display.show() # Render all things logically drawn - sleep_ms(5000) - # Blank the screen - print("\tBlanking the screen") - display.fill(0) # Parameter is a COLOUR, either `0` for black or `1` for white. - display.show() # Render - sleep_ms(1000) - # Draw a funny thing - print("\tDrawing a funny shape") - points = [None] * 5 - for i in range(5): - points[i] = ( WIDTH / 2 + cos(2 * pi / 5 * i) * HEIGHT / 2 , # how tf do you continue a line again in python - HEIGHT / 2 + sin(2 * pi / 5 * i) * HEIGHT / 2 ) - for i in range(5): - display.line(points[i][0], points[i][1], points[(i + 2) % 5][0], points[(i + 2) % 5][1], 1) - display.line(points[i][0], points[i][1], points[(i + 3) % 5][0], points[(i + 3) % 5][1], 1) - display.show() # Render - sleep_ms(5000) - # Blank the screen - print("\tBlanking the screen") - display.fill(0) - display.show() - print(" END draw_lines()") - -def draw_rectangles(display): - print(" BEGIN draw_rectangles()") - sleep_ms(2000) - # Draw an unfilled rectangle - print("\tDrawing an unfilled rectangle") - start_x = 0 - start_y = 0 - end_x = WIDTH / 2 - end_y = HEIGHT / 2 - colour = 1 # white - display.rect(start_x, start_y, end_x, end_y, colour) # UNFILLED rectangle - display.show() # render - sleep_ms(5000) - # Blank screen - print("\tBlanking the screen") - display.fill(0) - display.show() - # Draw a filled rectangle - print("\tDrawing a filled rectangle") - display.fill_rect(start_x, start_y, end_x, end_y, colour) # FILLED rectangle - display.show() # render - sleep_ms(5000) - # Blank the screen - print("\tBlanking the screen") - display.fill(0) - display.show() - print(" END draw_rectangles()") - - - -## TEST: Text and images - -def draw_text(display): - print(" BEGIN draw_text()") - sleep_ms(2000) - - # This should link to the font-pet-me-128.dat file in the current directory. - # **I have no idea how to change the location where we store this file.** - - # Hello world - print("\tDrawing hello world") - text_to_display = "Hello, World!" - top_left_x = 0 - top_left_y = 0 - colour = 1 - display.text(text_to_display, top_left_x, top_left_y, colour) - display.show() # render - sleep_ms(5000) - # Long string - print("\tDrawing long string") - display.text("The quick brown fox jumped over the lazy dog", 0, 15, 1) - display.show() - sleep_ms(5000) - # Blank screen - print("\tBlanking screen") - display.fill(0) - display.show() - sleep_ms(5000) - # Goodbye world - print("\tDrawing goodbye world") - display.text("Goodbye, world T-T", 0, 0, 1) - display.show() - sleep_ms(5000) - # Blank the screen - print("\tBlanking the screen") - display.fill(0) - display.show() - print(" END draw_text") - -def draw_images(display): - print(" BEGIN draw_images()") - sleep_ms(2000) - # Images are drawn from .pbm (Portable BitMap image) files. - # They should have dimensions 128 x 64. - - # Draw PiicoDev logo - print("\tDisplaying PiicoDev logo") - colour = 1 - display.load_pbm("./resources/piicodev-logo.pbm", colour) - display.show() # render - sleep_ms(5000) - # Blank screen - print("\tBlanking screen") - display.fill(0) - display.show() - sleep_ms(5000) - # Draw hilarious image - print("\tDisplaying hilarious image") - display.load_pbm("./resources/hilarious.pbm", 1) - display.show() - sleep_ms(5000) - # Blank screen - print("\tBlanking screen") - display.fill(0) - display.show() - print(" END draw_images()") - - - -## TEST: Graph - -def draw_graph(display): - print(" BEGIN draw_graph()") - sleep_ms(2000) - - curve = display.graph2D(minValue = -1, maxValue = 1, height = HEIGHT, width = WIDTH) - # minValue= key is for the minimum value on the logical graph. This gets mapped into pixels automatically. - # maxValue= key is for the maximum value on the logical graph. This gets mapped into pixels automatically. - # height= key is for maximum display height. - # width= key is for maximum display width. - - # Display a sine curve - print("\tDisplaying a sine curve") - for x in range(WIDTH): - y_value = sin(2 * pi * x / WIDTH) - display.fill(0) # NOTE: I don't know if this is necessary or not. - display.updateGraph2D(curve, y_value) # Add to the graph - display.show() # render - sleep_ms(5000) - # Add to the same curve - print("\tAdding another period to that curve, slowly") - for x in range(WIDTH): - display.fill(0) # When you add to the curve, I reckon this will mean that the whole graph overwrites the previous one. Otherwise, you'd get white garbage all over the screen - display.updateGraph2D(curve, sin(2 * pi * x / WIDTH)) # idk what this will do - display.show() - sleep_ms(100) # wait 1/10 of a second so that we can see what this does - sleep_ms(5000) - # Use a new curve to plot random values - print("\tPlotting random values on a new curve, slowly") - curve = display.graph2D(minValue = 0, maxValue = 1) # omitted height= and width= keys default to HEIGHT and WIDTH - for _ in range(WIDTH * 2): - display.fill(0) - display.updateGraph2D(curve, random.random()) - display.show() - sleep_ms(100) - sleep_ms(5000) - # Plot two curves simultaneously - print("\tPlotting two curves simultaneously") - linear = display.graph2D(minValue = 0, maxValue = WIDTH) - sinusoidal = display.graph2D(minValue = -1, maxValue = 1) - for x in range(WIDTH): - display.fill(0) - display.updateGraph2D(linear, x) - display.updateGraph2D(sinusoidal, sin(2 * pi * x / WIDTH)) - display.show() - sleep_ms(5000) - # Blank display - print("\tBlanking display") - display.fill(0) - display.show() - print(" END draw_graph()") - - - -## LAUNCH - -if __name__ == "__main__": - main() - diff --git a/client/proof_of_concept/flood_database_with_data.py b/client/proof_of_concept/flood_database_with_data.py deleted file mode 100644 index 94592dc..0000000 --- a/client/proof_of_concept/flood_database_with_data.py +++ /dev/null @@ -1,59 +0,0 @@ -""" -File: - sitting-desktop-garden/client/proof_of_concept/flood_database_with_data.py - -Purpose: - Providing debug test data - -Author: - Gabriel Field (47484306) - -Source: - Largely based on https://projects.raspberrypi.org/en/projects/getting-started-with-picamera/0 -""" - -## SECTION: Imports - -from datetime import datetime, timedelta -import random - -import data.routines as db - - - -## SECTION: Moses - -NUMBER_OF_SECONDS_IN_AN_HOUR = 3600 -"""title""" - -def DEBUG_flood() -> None: - """ - Flood the database with debug test data, for user with ID 1, set up for THE NEXT HOUR. - The debug test data includes a random posture record for each SECOND of the hour. - """ - print(" Killing database") - db.destroy_database() - print(" Initialising database") - db.init_database() - print(" Making user") - user_id = db.create_user() - print(f" {user_id=}") - print(" Flooding with posture records...") - now = datetime.now() - for second in range(NUMBER_OF_SECONDS_IN_AN_HOUR): - db.save_posture(posture = db.Posture( - id_ = None, - user_id = user_id, - prop_good = random.random(), - prop_in_frame = random.random(), - period_start = now + timedelta(seconds = second), - period_end = now + timedelta(seconds = second + 1) - )) - print(" Database flooded. Exiting.") - return - - -## LAUNCH - -if __name__ == "__main__": - DEBUG_flood() diff --git a/client/proof_of_concept/font-pet-me-128.dat b/client/proof_of_concept/font-pet-me-128.dat deleted file mode 100644 index ad4d9e3..0000000 Binary files a/client/proof_of_concept/font-pet-me-128.dat and /dev/null differ diff --git a/client/proof_of_concept/gpio.py b/client/proof_of_concept/gpio.py deleted file mode 100644 index a79a37b..0000000 --- a/client/proof_of_concept/gpio.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -File: - sitting-desktop-garden/client/proof_of_concept/gpio.py - -Purpose: - Hardware test for the GPIO pins, interfacing through RPi.GPIO. - -Author: - Gabriel Field (47484306) - -Source: - Largely based on https://elinux.org/RPi_GPIO_Code_Samples#RPi.GPIO - See also https://elinux.org/RPi_Low-level_peripherals#General_Purpose_Input.2FOutput_.28GPIO.29 -""" - -## SECTION: Imports - -import RPi.GPIO as GPIO # WARNING: This should be installed by default on the RPi, - # but not on your local machine. - # I haven't `poetry add`ed it (I tried, but it failed >:( ). -#from PiicoDev_Unified import sleep_ms -from time import sleep - - - -## SECTION: main() - -def main(): - print(" BEGIN main()...") - GPIO.setmode(GPIO.BCM) # use pin numbering convention as per the PiicoDev header - - # Set data directions - GPIO.setup(7, GPIO.IN) - GPIO.setup(8, GPIO.OUT) - - print(" Will read from pin 7 into code; will write to pin 8.") - print(" Doing that in 5 seconds...") - sleep(5) - - input_value = GPIO.input(7) - print(" Read " + str(input_value) + " from pin 7") - GPIO.output(8, GPIO.HIGH) - print(" Wrote high to pin 8") - - print(" Writing low to pin 8 in 2 seconds") - sleep(2) - GPIO.output(8, GPIO.LOW) - - print(" END main()") - - - -## LAUNCH - -if __name__ == "__main__": - main() - diff --git a/client/proof_of_concept/resources/hilarious.pbm b/client/proof_of_concept/resources/hilarious.pbm deleted file mode 100644 index 01006c9..0000000 Binary files a/client/proof_of_concept/resources/hilarious.pbm and /dev/null differ diff --git a/client/proof_of_concept/resources/piicodev-logo.pbm b/client/proof_of_concept/resources/piicodev-logo.pbm deleted file mode 100644 index 9f12b5b..0000000 Binary files a/client/proof_of_concept/resources/piicodev-logo.pbm and /dev/null differ diff --git a/client/proof_of_concept/servo_driver.py b/client/proof_of_concept/servo_driver.py deleted file mode 100644 index 2d93049..0000000 --- a/client/proof_of_concept/servo_driver.py +++ /dev/null @@ -1,101 +0,0 @@ -""" -File: - sitting-desktop-garden/client/proof_of_concept/servo_driver.py - -Purpose: - Hardware test for the PiicoDev servos and server driver. - -Author: - Gabriel Field (47484306) - -Source: - Largely based on https://core-electronics.com.au/guides/piicodev/piicodev-servo-driver-pca9685-getting-started-guide/#JEFNE3E -""" - -## SECTION: Imports - -from PiicoDev_Servo import PiicoDev_Servo, PiicoDev_Servo_Driver -from PiicoDev_Unified import sleep_ms -from math import sin, pi - - - -## SECTION: main() - -def main(): - print("==== TEST: Basic use ====") - controller = PiicoDev_Servo_Driver() # Initialise the Servo Driver Module - servo = PiicoDev_Servo(controller, 1) # Simple setup: Attach a servo to channel 1 of the controller with default properties - test_step(servo) - continuous_servo = PiicoDev_Servo(controller, 1, midpoint_us=1500, range_us=1800) # Connect a 360° servo to channel 2 - test_spinny(continuous_servo) - - print(" Hook up two servos, with channels 1 and 2") - print(" You have 10 seconds") - sleep_ms(10000) - print(" Assuming they're allg") - - print("==== TEST: Multiple servos ====") - - - print("==== TEST: COMPLETE! ====") - servo1 = PiicoDev_Servo(controller, 1) - servo2 = PiicoDev_Servo(controller, 2) - test_multiple(servo1, servo2) - - -## TEST: Basic use - -def test_step(servo): - print(" BEGIN test_step()") - sleep_ms(2000) - print("\tStepping the servo") - servo.angle = 0 - sleep_ms(1000) - servo.angle = 90 - print(" END test_step()") - -def test_spinny(continuous_servo): - print(" BEGIN test_spinny()") - sleep_ms(2000) - print("\tStopping") - continuous_servo.speed = 0 - sleep_ms(5000) - print("\tGoing full-speed forwards") - continuous_servo.speed = 1 - sleep_ms(5000) - print("\tGoing half-speed forwards") - continuous_servo.speed = 0.5 - sleep_ms(5000) - print("\tGoing half-speed backwards") - continuous_servo.speed = -0.5 - sleep_ms(5000) - print("\tGoing full-speed backwards") - continuous_servo.speed = -1 - sleep_ms(5000) - print("\tStopping") - continuous_servo.speed = 0 - sleep_ms(5000) - print(" END test_spinny()") - - - -## TEST: Multiple servos - -def test_multiple(servo1, servo2): - print(" BEGIN test_multiple()") - sleep_ms(2000) - # This takes 18 seconds - for i in range(1800): - servo1.angle = 90 + 90 * sin(i * 2 * pi / 1800) - servo2.angle = 90 - 90 * sin(i * 2 * pi / 1800) - sleep_ms(10) - print(" END test_multiple()") - - - - -## LAUNCH - -if __name__ == "__main__": - main()