Skip to content
This repository has been archived by the owner on Dec 8, 2024. It is now read-only.

Obliteration #90

Merged
merged 5 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 6 additions & 73 deletions client/drivers/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,6 @@ def make_failed(cls) -> "ControlledData":

# SECTION: Getters/Setters

def DEBUG_get_next_posture_graph_value(self) -> int:
"""
Get next thing to put on the DEBUG graph.

Returns:
(int): Next thing to put on the DEBUG graph.

TODO: Remove this method
"""
return_me = self._DEBUG_current_graph_function(
self._DEBUG_current_graph_list_index
)
self._DEBUG_current_graph_list_index += 1
return return_me

def is_failed(self) -> bool:
"""
Returns:
Expand Down Expand Up @@ -216,21 +201,6 @@ def set_last_plant_time(self, time: datetime) -> None:
"""
self._last_plant_time = time

def get_last_sniff_time(self) -> datetime:
"""
Returns:
(datetime): The last time that the user was provided olfactory feedback.
"""
return self._last_cushion_time

def set_last_sniff_time(self, time: datetime) -> None:
"""
Args:
time : datetime
The last time that the user was provided olfactory feedback.
"""
self._last_sniff_time = time

def accept_new_posture_data(
self, posture_data: List[float]
) -> None: # TODO: Refine type signature
Expand All @@ -249,47 +219,6 @@ def accept_new_posture_data(
for datum in posture_data:
self._posture_data.put_nowait(datum)

# SECTION: Posture data mapping

def get_cushion_posture_data(
self,
) -> "CUSHION_POSTURE_DATA": # TODO: Decide what this type looks like
"""
Returns:
(CUSHION_POSTURE_DATA): Posture data necessary for cushion feedback.
TODO: Implement this.
"""
# DEBUG:
print("<!> WARNING: get_cushion_posture_data() not implemented!")
# :DEBUG
return None

def get_plant_posture_data(
self,
) -> "PLANT_POSTURE_DATA": # TODO: Decide what this type looks like
"""
Returns:
(PLANT_POSTURE_DATA) Posture data necessary for plant feedback.
TODO: Implement this.
"""
# DEBUG:
print("<!> WARNING: get_plant_posture_data() not implemented!")
# :DEBUG
return None

def get_sniff_posture_data(
self,
) -> "SNIFF_POSTURE_DATA": # TODO: Decide what this type looks like
"""
Returns:
(SNIFF_POSTURE_DATA): Posture data necessary for scent feedback.
TODO: Implement this.
"""
# DEBUG:
print("<!> WARNING: get_sniff_posture_data() not implemented!")
# :DEBUG
return None


## SECTION: Hardware packaged together

Expand Down Expand Up @@ -574,15 +503,19 @@ def oled_display_texts(self, texts: List[str], x: int, y: int, colour: int) -> i
)
return display_height_offset

def send_message(self, message: str, message_time: int = 1) -> None:
def send_message(self, messages: List[str], message_time: int = 1) -> None:
"""Clear the screen and display message

Args:
message: Message to send to the user
message_time: Time (seconds) to sleep for after displaying message.
"""
self.display.fill(0)
self.oled_display_text(message, 0, 0, 1)
display_height_offset = 0
for text in messages:
display_height_offset = self.oled_display_text(
text, 0, 0 + display_height_offset, 1
)
self.display.show()
time.sleep(message_time)

Expand Down
32 changes: 16 additions & 16 deletions client/drivers/login_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
Status.TOO_MANY_FACES.value: "Too many faces detected",
Status.NO_MATCH.value: "Could not match face",
}
QUIT_INSTRUCTIONS = "Right button to quit"
QUIT_INSTRUCTIONS = "Right: quit"

Action = Callable[[HardwareComponents], int]

Expand All @@ -44,9 +44,9 @@ def handle_authentication(hardware: HardwareComponents) -> int:
while True:
_log_and_send(
hardware,
"Left button to login\n"
"Right button to register\n"
"Double press right button to reset data",
["Left: login",
"Right: register",
"Double press right: reset data"]
)
button = hardware.wait_for_button_press()

Expand Down Expand Up @@ -82,8 +82,8 @@ def _loop_action(hardware: HardwareComponents, action: Action) -> int:

def _attempt_login(hardware: HardwareComponents) -> int:
capturer = RaspCapturer()
message = f"Press left button to take photo\n{QUIT_INSTRUCTIONS}"
_log_and_send(hardware, message, message_time=0)
messages = ["Left: take photo", f"{QUIT_INSTRUCTIONS}"]
_log_and_send(hardware, messages, message_time=0)

button_pressed = hardware.wait_for_button_press()
if button_pressed == LEFT_BUTTON:
Expand All @@ -92,7 +92,7 @@ def _attempt_login(hardware: HardwareComponents) -> int:
if button_pressed == RIGHT_BUTTON:
return QUIT

_log_and_send(hardware, "Trying login...", message_time=0)
_log_and_send(hardware, ["Trying login..."], message_time=0)
status = get_face_match(face)
_handle_status_message(hardware, status)

Expand All @@ -105,11 +105,11 @@ def _attempt_register(hardware: HardwareComponents) -> int:
# Capture NUM_FACES faces
faces: list[np.ndarray] = []
for i in range(NUM_FACES):
message = (
f"Press left button to take photo {i + 1}/{NUM_FACES}\n"
messages = [
f"Left: take photo {i + 1}/{NUM_FACES}",
f"{QUIT_INSTRUCTIONS}"
)
_log_and_send(hardware, message, message_time=0)
]
_log_and_send(hardware, messages, message_time=0)

button_pressed = hardware.wait_for_button_press()
if button_pressed == RIGHT_BUTTON:
Expand All @@ -120,13 +120,13 @@ def _attempt_register(hardware: HardwareComponents) -> int:
faces.append(frame)

# Try register faces
_log_and_send(hardware, "Registering...")
_log_and_send(hardware, ["Registering..."])
user_id = next_user_id()
status = register_faces(user_id, faces)

if status == Status.OK.value:
create_user()
_log_and_send(hardware, "Registration successful!")
_log_and_send(hardware, ["Registration successful!"])
return user_id

_handle_status_message(hardware, status)
Expand All @@ -144,7 +144,7 @@ def _handle_status_message(hardware: HardwareComponents, status: int) -> None:


def _log_and_send(
hardware: HardwareComponents, message: str, message_time: int = 1
hardware: HardwareComponents, messages: list[str], message_time: int = 1
) -> None:
logger.debug(message)
hardware.send_message(message, message_time=message_time)
logger.debug(messages)
hardware.send_message(messages, message_time=message_time)
90 changes: 20 additions & 70 deletions client/drivers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@
FIXME: Fine-tune this value later.
"""

# KILLME:
HANDLE_SNIFF_FEEDBACK_TIMEOUT = timedelta(milliseconds=20000)
""" Minimum delay between consecutive uses of the scent bottle-controlling servos. Used in handle_feedback(). """

DEBUG_DO_EVERYTHING_INTERVAL = 1000
""" DEBUG Number of milliseconds between each loop iteration in do_everything(). """

Expand Down Expand Up @@ -146,7 +142,6 @@ def initialise_hardware() -> HardwareComponents:

TODO: Complete the function with all of the hardware peripherals (incrementally, as they get integrated).
"""
print("<!> initialise_hardware()") # DEBUG
return_me = HardwareComponents.make_fresh()
# Clear button queues
return_me.button0.was_pressed
Expand All @@ -157,7 +152,6 @@ def initialise_hardware() -> HardwareComponents:
# Write low to stop buzzer from mistakenly buzzing, if necessary
GPIO.output(CUSHION_GPIO_PIN, GPIO.LOW)

print("<!> initialise_hardware() FINISHED") # DEBUG
return return_me


Expand All @@ -175,11 +169,7 @@ def do_everything(auspost: ControlledData) -> None:
(auspost : ControlledData): Data encapsulating the current state of the program.
Requires:
! auspost.is_failed()

TODO: Actually implement this
"""
print("<!> BEGIN do_everything()")

LOGIN_MESSAGE = "Logged in with user id: " + str(auspost.get_user_id())
LOGOUT_MESSAGE = "Logged out user id " + str(auspost.get_user_id())

Expand All @@ -206,7 +196,6 @@ def do_everything(auspost: ControlledData) -> None:
hardware.display.show()

while True:
# Loop invariant: ! auspost.is_failed()
# Check for user logout
if hardware.button0.was_pressed:
hardware.display.fill(0)
Expand All @@ -217,8 +206,7 @@ def do_everything(auspost: ControlledData) -> None:
return

update_display_screen(auspost)
# handle_posture_monitoring(auspost)
handle_posture_monitoring_new(auspost)
handle_posture_graph(auspost)
handle_feedback(auspost)

sleep_ms(DEBUG_DO_EVERYTHING_INTERVAL)
Expand All @@ -242,8 +230,6 @@ def update_display_screen(auspost: ControlledData) -> bool:
Ensures:
! auspost.is_failed()
"""
print("<!> BEGIN update_display_screen()")

while (
not auspost.get_posture_data().empty()
): # NOTE: This is much more robust than getting a fixed number of things out of the queue
Expand All @@ -256,16 +242,25 @@ def update_display_screen(auspost: ControlledData) -> bool:
)
hardware.display.show()

print("<!> END update_display_screen()")
return True


def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
def handle_posture_graph(auspost: ControlledData) -> bool:
"""
Get a snapshot of the user's posture data.
Use this information to update the data for the posture graph.

# DEBUG:
print("<!> handle_posture_monitoring_new()")
# :DEBUG
Args:
(auspost : ControlledData): Data encapsulating the current state of the program.
Returns:
(bool): True, always. If you get a False return value, then something has gone VERY wrong.
Requires:
! auspost.is_failed()
Ensures:
! auspost.is_failed()

TODO: Check this
"""
now = datetime.now()

if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT:
Expand All @@ -277,16 +272,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
period_end=now,
)

# DEBUG::
# Exit if not enough data
# if len(recent_posture_data) <= POSTURE_GRAPH_DATUM_WIDTH:
# Exit if no data
if len(recent_posture_data) == 0:
# ::DEBUG
print("<!> Exiting handle_posture_monitoring_new() early: Not enough data")
# auspost.set_last_snapshot_time(datetime.now())
return True

# Exit if not in frame enough
# Exit if person not in frame enough
average_prop_in_frame = sum(
[posture.prop_in_frame for posture in recent_posture_data]
) / len(recent_posture_data)
Expand All @@ -310,11 +301,12 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
# Calculate the interval length
interval = total_time / POSTURE_GRAPH_DATUM_WIDTH

# Setup a sublist each representing 1 pixel on the graph
# Setup sublists, where each sublist is a portion of the overall data
split_posture_lists: list[list[Posture]]
split_posture_lists = [[] for _ in range(POSTURE_GRAPH_DATUM_WIDTH)]

# Sublists will be split by period_start
# What is in each sublist is determined by period_start
# We want an approximately equal amount of data in each sublist
for posture in recent_posture_data:
index = min(
POSTURE_GRAPH_DATUM_WIDTH - 1,
Expand All @@ -331,50 +323,13 @@ def handle_posture_monitoring_new(auspost: ControlledData) -> bool:
average_prop_good = sum(
[posture.prop_good for posture in posture_list]
) / len(posture_list)
# KILLME:
# print(f"<!> {average_prop_good=}")
# auspost.accept_new_posture_data([average_prop_good] * DEBUG_MULTIPLIER_CONSTANT) # DEBUG: 2024-10-06_20-16 Gabe: Fixed the typing by wrapping into a singleton list
# print(f"<!> Avg prop_good is {average_prop_good}")
new_prop_good_data += [average_prop_good] * POSTURE_GRAPH_DATUM_WIDTH
auspost.accept_new_posture_data(new_prop_good_data)

auspost.set_last_snapshot_time(now)

return True


# KILLME:
def handle_posture_monitoring(auspost: ControlledData) -> bool:
"""
Take a snapshot monitoring the user, and update the given ControlledData if necessary.

Args:
(auspost : ControlledData): Data encapsulating the current state of the program.
Returns:
(bool): True, always. If you get a False return value, then something has gone VERY wrong.
Requires:
! auspost.is_failed()
Ensures:
! auspost.is_failed()

TODO: Implement error handling
WARNING: UNTESTED!
"""
# DEBUG:
print("<!> handle_posture_monitoring()")
# :DEBUG
now = datetime.now()
if now > auspost.get_last_snapshot_time() + GET_POSTURE_DATA_TIMEOUT:
# TODO: The ai_bros_get_posture_data() call might fail once it's implemented properly.
# If it does, we need to handle it properly.
auspost.accept_new_posture_data([])
# DEBUG:
auspost.accept_new_posture_data([auspost.DEBUG_get_next_posture_graph_value()])
# :DEBUG
auspost.set_last_snapshot_time(now)
return True


def handle_feedback(auspost: ControlledData) -> bool:
"""
Provide feedback to the user if necessary.
Expand All @@ -397,9 +352,6 @@ def handle_feedback(auspost: ControlledData) -> bool:
if datetime.now() > auspost.get_last_plant_time() + HANDLE_PLANT_FEEDBACK_TIMEOUT:
if not handle_plant_feedback(auspost):
return False
if datetime.now() > auspost.get_last_sniff_time() + HANDLE_SNIFF_FEEDBACK_TIMEOUT:
if not handle_sniff_feedback(auspost):
return False

return True

Expand All @@ -420,8 +372,6 @@ def handle_cushion_feedback(auspost: ControlledData) -> bool:
! auspost.is_failed()
Ensures:
! auspost.is_failed()

TODO: Implement this method. Currently prints a debug statement and updates the time.
"""
# DEBUG:
print("<!> handle_cushion_feedback()")
Expand Down